You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/05/05 08:37:56 UTC

[arrow] branch master updated: ARROW-2541: [Plasma] Replace macros with constexpr

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bdfff8  ARROW-2541: [Plasma] Replace macros with constexpr
5bdfff8 is described below

commit 5bdfff8612c30b0e458d026e7a11b859663fa00b
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Sat May 5 10:37:48 2018 +0200

    ARROW-2541: [Plasma] Replace macros with constexpr
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2001 from pcmoritz/remove-macros and squashes the following commits:
    
    cc3a7d24 <Philipp Moritz> fix comments
    3ac436b1 <Philipp Moritz> fix linting
    de7b0b63 <Philipp Moritz> more cleanups
    9024214e <Philipp Moritz> fix linting
    a59e1b5b <Philipp Moritz> fix documentation
    176b7c66 <Philipp Moritz> clean up macros
---
 cpp/apidoc/tutorials/plasma.md      |  8 +++----
 cpp/src/plasma/client.cc            |  4 ++--
 cpp/src/plasma/client.h             | 13 ++++++++---
 cpp/src/plasma/format/plasma.fbs    |  4 +++-
 cpp/src/plasma/io.cc                | 43 ++++++++++++++++++++-----------------
 cpp/src/plasma/io.h                 |  7 +++---
 cpp/src/plasma/plasma.h             |  3 ++-
 cpp/src/plasma/store.cc             |  6 +++---
 cpp/src/plasma/test/client_tests.cc |  6 ++----
 9 files changed, 52 insertions(+), 42 deletions(-)

diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md
index 8d54a10..5a0a2ba 100644
--- a/cpp/apidoc/tutorials/plasma.md
+++ b/cpp/apidoc/tutorials/plasma.md
@@ -80,7 +80,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
   // Disconnect the Plasma client.
   ARROW_CHECK_OK(client.Disconnect());
 }
@@ -218,7 +218,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
   // Create an object with a fixed ObjectID.
   ObjectID object_id = ObjectID::from_binary("00000000000000000000");
   int64_t data_size = 1000;
@@ -323,7 +323,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
   ObjectID object_id = ObjectID::from_binary("00000000000000000000");
   ObjectBuffer object_buffer;
   ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
@@ -411,7 +411,7 @@ using namespace plasma;
 int main(int argc, char** argv) {
   // Start up and connect a Plasma client.
   PlasmaClient client;
-  ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+  ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
 
   int fd;
   ARROW_CHECK_OK(client.Subscribe(&fd));
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index b4ee098..a332686 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -703,8 +703,8 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
   const int num_threads = kThreadPoolSize;
   uint64_t threadhash[num_threads + 1];
   const uint64_t data_address = reinterpret_cast<uint64_t>(data);
-  const uint64_t num_blocks = nbytes / BLOCK_SIZE;
-  const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
+  const uint64_t num_blocks = nbytes / kBlockSize;
+  const uint64_t chunk_size = (num_blocks / num_threads) * kBlockSize;
   const uint64_t right_address = data_address + chunk_size * num_threads;
   const uint64_t suffix = (data_address + nbytes) - right_address;
   // Now the data layout is | k * num_threads * block_size | suffix | ==
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 4221997..4e1ff4a 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -34,7 +34,14 @@ using arrow::Status;
 
 namespace plasma {
 
-#define PLASMA_DEFAULT_RELEASE_DELAY 64
+ARROW_DEPRECATED("PLASMA_DEFAULT_RELEASE_DELAY is deprecated")
+constexpr int64_t kDeprecatedPlasmaDefaultReleaseDelay = 64;
+#define PLASMA_DEFAULT_RELEASE_DELAY kDeprecatedPlasmaDefaultReleaseDelay;
+
+/// We keep a queue of unreleased objects cached in the client until we start
+/// sending release requests to the store. This is to avoid frequently mapping
+/// and unmapping objects and evicting data from processor caches.
+constexpr int64_t kPlasmaDefaultReleaseDelay = 64;
 
 /// Object buffer data structure.
 struct ObjectBuffer {
@@ -64,8 +71,8 @@ class ARROW_EXPORT PlasmaClient {
   /// \param num_retries number of attempts to connect to IPC socket, default 50
   /// \return The return status.
   Status Connect(const std::string& store_socket_name,
-                 const std::string& manager_socket_name, int release_delay,
-                 int num_retries = -1);
+                 const std::string& manager_socket_name,
+                 int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
 
   /// Create an object in the Plasma Store. Any metadata for this object must be
   /// be passed in when the object is created.
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 0258cdf..6a58fb0 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -18,8 +18,10 @@
 // Plasma protocol specification
 
 enum MessageType:int {
+  // Message that gets send when a client hangs up.
+  PlasmaDisconnectClient = 0,
   // Create a new object.
-  PlasmaCreateRequest = 1,
+  PlasmaCreateRequest,
   PlasmaCreateReply,
   PlasmaAbortRequest,
   PlasmaAbortReply,
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index 4142bf9..aefd297 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -24,12 +24,14 @@
 #include "arrow/status.h"
 
 #include "plasma/common.h"
+#include "plasma/plasma_generated.h"
 
 using arrow::Status;
 
-/* Number of times we try connecting to a socket. */
-#define NUM_CONNECT_ATTEMPTS 50
-#define CONNECT_TIMEOUT_MS 100
+/// Number of times we try connecting to a socket.
+constexpr int64_t kNumConnectAttempts = 50;
+/// Time to wait between connection attempts to a socket.
+constexpr int64_t kConnectTimeoutMs = 100;
 
 namespace plasma {
 
@@ -38,8 +40,8 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
   size_t bytesleft = length;
   size_t offset = 0;
   while (bytesleft > 0) {
-    /* While we haven't written the whole message, write to the file descriptor,
-     * advance the cursor, and decrease the amount left to write. */
+    // While we haven't written the whole message, write to the file descriptor,
+    // advance the cursor, and decrease the amount left to write.
     nbytes = write(fd, cursor + offset, bytesleft);
     if (nbytes < 0) {
       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
@@ -58,7 +60,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
 }
 
 Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
-  int64_t version = PLASMA_PROTOCOL_VERSION;
+  int64_t version = kPlasmaProtocolVersion;
   RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
   RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
   RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
@@ -67,7 +69,7 @@ Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
 
 Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
   ssize_t nbytes = 0;
-  /* Termination condition: EOF or read 'length' bytes total. */
+  // Termination condition: EOF or read 'length' bytes total.
   size_t bytesleft = length;
   size_t offset = 0;
   while (bytesleft > 0) {
@@ -91,20 +93,21 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
 Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
   int64_t version;
   RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
-                     *type = DISCONNECT_CLIENT);
-  ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
+                     *type = MessageType_PlasmaDisconnectClient);
+  ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
   RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
-                     *type = DISCONNECT_CLIENT);
+                     *type = MessageType_PlasmaDisconnectClient);
   int64_t length_temp;
   RETURN_NOT_OK_ELSE(
       ReadBytes(fd, reinterpret_cast<uint8_t*>(&length_temp), sizeof(length_temp)),
-      *type = DISCONNECT_CLIENT);
+      *type = MessageType_PlasmaDisconnectClient);
   // The length must be read as an int64_t, but it should be used as a size_t.
   size_t length = static_cast<size_t>(length_temp);
   if (length > buffer->size()) {
     buffer->resize(length);
   }
-  RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length),
+                     *type = MessageType_PlasmaDisconnectClient);
   return Status::OK();
 }
 
@@ -115,7 +118,7 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
     ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
     return -1;
   }
-  /* Tell the system to allow the port to be reused. */
+  // Tell the system to allow the port to be reused.
   int on = 1;
   if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
                  sizeof(on)) < 0) {
@@ -150,23 +153,23 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
 
 Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
                              int64_t timeout, int* fd) {
-  /* Pick the default values if the user did not specify. */
+  // Pick the default values if the user did not specify.
   if (num_retries < 0) {
-    num_retries = NUM_CONNECT_ATTEMPTS;
+    num_retries = kNumConnectAttempts;
   }
   if (timeout < 0) {
-    timeout = CONNECT_TIMEOUT_MS;
+    timeout = kConnectTimeoutMs;
   }
   *fd = connect_ipc_sock(pathname);
   while (*fd < 0 && num_retries > 0) {
     ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
                      << ", retrying " << num_retries << " more times";
-    /* Sleep for timeout milliseconds. */
+    // Sleep for timeout milliseconds.
     usleep(static_cast<int>(timeout * 1000));
     *fd = connect_ipc_sock(pathname);
     --num_retries;
   }
-  /* If we could not connect to the socket, exit. */
+  // If we could not connect to the socket, exit.
   if (*fd == -1) {
     std::stringstream ss;
     ss << "Could not connect to socket " << pathname;
@@ -215,7 +218,7 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
   int64_t size;
   Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
   if (!s.ok()) {
-    /* The other side has closed the socket. */
+    // The other side has closed the socket.
     ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
     close(sock);
     return NULL;
@@ -223,7 +226,7 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
   auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
   s = ReadBytes(sock, message.get(), size);
   if (!s.ok()) {
-    /* The other side has closed the socket. */
+    // The other side has closed the socket.
     ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
     close(sock);
     return NULL;
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 8869c9b..f0d57c0 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -30,13 +30,12 @@
 #include "arrow/status.h"
 #include "plasma/compat.h"
 
+namespace plasma {
+
 // TODO(pcm): Replace our own custom message header (message type,
 // message length, plasma protocol verion) with one that is serialized
 // using flatbuffers.
-#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
-#define DISCONNECT_CLIENT 0
-
-namespace plasma {
+constexpr int64_t kPlasmaProtocolVersion = 0x0000000000000000;
 
 using arrow::Status;
 
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 4b4064c..a1d6e99 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -36,6 +36,7 @@
 
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
 #include "plasma/common.h"
 #include "plasma/common_generated.h"
 
@@ -65,7 +66,7 @@ namespace plasma {
   } while (0);
 
 /// Allocation granularity used in plasma for object allocation.
-#define BLOCK_SIZE 64
+constexpr int64_t kBlockSize = 64;
 
 struct Client;
 
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 2caa3cb..c06ad6a 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -170,7 +170,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
     // 64-byte aligned, but in practice it often will be.
     if (device_num == 0) {
       pointer =
-          reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
+          reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
       if (pointer == NULL) {
         // Tell the eviction policy how much space we need to create this object.
         std::vector<ObjectID> objects_to_evict;
@@ -741,7 +741,7 @@ Status PlasmaStore::process_message(Client* client) {
       HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity),
                      client->fd);
     } break;
-    case DISCONNECT_CLIENT:
+    case MessageType_PlasmaDisconnectClient:
       ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
       disconnect_client(client->fd);
       break;
@@ -768,7 +768,7 @@ class PlasmaStoreRunner {
     // achieve that by mallocing and freeing a single large amount of space.
     // that maximum allowed size up front.
     if (use_one_memory_mapped_file) {
-      void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
+      void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
       ARROW_CHECK(pointer != NULL);
       plasma::dlfree(pointer);
     }
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index dad7688..53743ca 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -59,10 +59,8 @@ class TestPlasmaStore : public ::testing::Test {
                                  "/plasma_store -m 1000000000 -s /tmp/store" +
                                  store_index + " 1> /dev/null 2> /dev/null &";
     system(plasma_command.c_str());
-    ARROW_CHECK_OK(
-        client_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
-    ARROW_CHECK_OK(
-        client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
+    ARROW_CHECK_OK(client_.Connect("/tmp/store" + store_index, ""));
+    ARROW_CHECK_OK(client2_.Connect("/tmp/store" + store_index, ""));
   }
   virtual void TearDown() {
     ARROW_CHECK_OK(client_.Disconnect());

-- 
To stop receiving notification emails like this one, please contact
uwe@apache.org.