You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/03 22:28:58 UTC

[arrow] branch master updated: ARROW-3199: [Plasma] File descriptor send and receive retries

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 67d9264  ARROW-3199: [Plasma] File descriptor send and receive retries
67d9264 is described below

commit 67d92642900ec29a41fa67b720e5b77bfa755e2d
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Mon Dec 3 14:28:49 2018 -0800

    ARROW-3199: [Plasma] File descriptor send and receive retries
    
    An additional piece of eyes would be appreciated for this. It seems to solve our issue reported in the JIRA, but I'm not sure what the semantics of partial reads/writes is here (in particular, how are partial read/writes handled for ancillary data like file descriptors?).
    
    found by cc @stephanie-wang
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2551 from pcmoritz/plasma-retries and squashes the following commits:
    
    c7ca3b700 <Philipp Moritz> fix
    91061adc5 <Philipp Moritz> linting
    fda4dd27d <Philipp Moritz> move retry code
    5dbabd79a <Philipp Moritz> fix travis
    bc876dc29 <Philipp Moritz> fix compile errors
    e1580fe81 <Philipp Moritz> plasma file descriptor send and receive retries
---
 cpp/src/plasma/fling.cc | 47 +++++++++++++++++++++++++++++++++++++++++------
 cpp/src/plasma/store.cc | 17 +----------------
 2 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc
index 26afd87..f0960aa 100644
--- a/cpp/src/plasma/fling.cc
+++ b/cpp/src/plasma/fling.cc
@@ -16,6 +16,8 @@
 
 #include <string.h>
 
+#include "arrow/util/logging.h"
+
 void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
   iov->iov_base = buf;
   iov->iov_len = 1;
@@ -46,11 +48,32 @@ int send_fd(int conn, int fd) {
   memcpy(CMSG_DATA(header), reinterpret_cast<void*>(&fd), sizeof(int));
 
   // Send file descriptor.
-  ssize_t r = sendmsg(conn, &msg, 0);
-  if (r >= 0) {
-    return 0;
-  } else {
-    return static_cast<int>(r);
+  while (true) {
+    ssize_t r = sendmsg(conn, &msg, 0);
+    if (r < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+        continue;
+      } else if (errno == EMSGSIZE) {
+        ARROW_LOG(WARNING) << "Failed to send file descriptor"
+                           << " (errno = EMSGSIZE), retrying.";
+        // If we failed to send the file descriptor, loop until we have sent it
+        // successfully. TODO(rkn): This is problematic for two reasons. First
+        // of all, sending the file descriptor should just succeed without any
+        // errors, but sometimes I see a "Message too long" error number.
+        // Second, looping like this allows a client to potentially block the
+        // plasma store event loop which should never happen.
+        continue;
+      } else {
+        ARROW_LOG(INFO) << "Error in send_fd (errno = " << errno << ")";
+        return static_cast<int>(r);
+      }
+    } else if (r == 0) {
+      ARROW_LOG(INFO) << "Encountered unexpected EOF";
+      return 0;
+    } else {
+      ARROW_CHECK(r > 0);
+      return static_cast<int>(r);
+    }
   }
 }
 
@@ -60,7 +83,19 @@ int recv_fd(int conn) {
   char buf[CMSG_SPACE(sizeof(int))];
   init_msg(&msg, &iov, buf, sizeof(buf));
 
-  if (recvmsg(conn, &msg, 0) == -1) return -1;
+  while (true) {
+    ssize_t r = recvmsg(conn, &msg, 0);
+    if (r == -1) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+        continue;
+      } else {
+        ARROW_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")";
+        return -1;
+      }
+    } else {
+      break;
+    }
+  }
 
   int found_fd = -1;
   int oh_noes = 0;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 28624d0..bb99f59 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -327,22 +327,7 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
   if (s.ok()) {
     // Send all of the file descriptors for the present objects.
     for (int store_fd : store_fds) {
-      int error_code = send_fd(get_req->client->fd, store_fd);
-      // If we failed to send the file descriptor, loop until we have sent it
-      // successfully. TODO(rkn): This is problematic for two reasons. First
-      // of all, sending the file descriptor should just succeed without any
-      // errors, but sometimes I see a "Message too long" error number.
-      // Second, looping like this allows a client to potentially block the
-      // plasma store event loop which should never happen.
-      while (error_code < 0) {
-        if (errno == EMSGSIZE) {
-          ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
-          error_code = send_fd(get_req->client->fd, store_fd);
-          continue;
-        }
-        WarnIfSigpipe(error_code, get_req->client->fd);
-        break;
-      }
+      WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
     }
   }