You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/04/24 22:37:41 UTC

svn commit: r1471657 - in /incubator/mesos/trunk/third_party/libprocess: include/process/socket.hpp src/encoder.hpp src/process.cpp

Author: benh
Date: Wed Apr 24 20:37:41 2013
New Revision: 1471657

URL: http://svn.apache.org/r1471657
Log:
Added a Socket reference to Encoder.

Review: https://reviews.apache.org/r/10742

Modified:
    incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp
    incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp Wed Apr 24 20:37:41 2013
@@ -15,7 +15,7 @@ public:
   Socket()
     : refs(new int(1)), s(-1) {}
 
-  Socket(int _s)
+  explicit Socket(int _s)
     : refs(new int(1)), s(_s) {}
 
   ~Socket()
@@ -62,7 +62,10 @@ private:
     if (__sync_sub_and_fetch(refs, 1) == 0) {
       delete refs;
       if (s >= 0) {
-        close(s);
+        if (close(s) != 0) {
+          perror("Failed to close socket");
+          abort();
+        }
       }
     }
   }

Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Wed Apr 24 20:37:41 2013
@@ -28,16 +28,26 @@ extern void send_file(struct ev_loop*, e
 class Encoder
 {
 public:
+  Encoder(const Socket& _s) : s(_s) {}
   virtual ~Encoder() {}
+
   virtual Sender sender() = 0;
+
+  Socket socket() const
+  {
+    return s;
+  }
+
+private:
+  const Socket s; // The socket this encoder is associated with.
 };
 
 
 class DataEncoder : public Encoder
 {
 public:
-  DataEncoder(const std::string& _data)
-    : data(_data), index(0) {}
+  DataEncoder(const Socket& s, const std::string& _data)
+    : Encoder(s), data(_data), index(0) {}
 
   virtual ~DataEncoder() {}
 
@@ -75,8 +85,8 @@ private:
 class MessageEncoder : public DataEncoder
 {
 public:
-  MessageEncoder(Message* _message)
-    : DataEncoder(encode(_message)), message(_message) {}
+  MessageEncoder(const Socket& s, Message* _message)
+    : DataEncoder(s, encode(_message)), message(_message) {}
 
   virtual ~MessageEncoder()
   {
@@ -119,9 +129,10 @@ class HttpResponseEncoder : public DataE
 {
 public:
   HttpResponseEncoder(
+      const Socket& s,
       const http::Response& response,
       const http::Request& request)
-    : DataEncoder(encode(response, request)) {}
+    : DataEncoder(s, encode(response, request)) {}
 
   static std::string encode(
       const http::Response& response,
@@ -202,8 +213,8 @@ public:
 class FileEncoder : public Encoder
 {
 public:
-  FileEncoder(int _fd, size_t _size)
-    : fd(_fd), size(_size), index(0) {}
+  FileEncoder(const Socket& s, int _fd, size_t _size)
+    : Encoder(s), fd(_fd), size(_size), index(0) {}
 
   virtual ~FileEncoder()
   {

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Apr 24 20:37:41 2013
@@ -310,12 +310,12 @@ public:
 
   void link(ProcessBase* process, const UPID& to);
 
-  PID<HttpProxy> proxy(int s);
+  PID<HttpProxy> proxy(const Socket& socket);
 
-  void send(Encoder* encoder, int s, bool persist);
+  void send(Encoder* encoder, bool persist);
   void send(const Response& response,
             const Request& request,
-            int s);
+            const Socket& socket);
   void send(Message* message);
 
   Encoder* next(int s);
@@ -1560,12 +1560,14 @@ bool HttpProxy::process(const Future<Res
 
         // TODO(benh): Consider a way to have the socket manager turn
         // on TCP_CORK for both sends and then turn it off.
-        Encoder* encoder = new HttpResponseEncoder(response, request);
-        socket_manager->send(encoder, socket, true);
+        socket_manager->send(
+            new HttpResponseEncoder(socket, response, request),
+            true);
 
         // Note the file descriptor gets closed by FileEncoder.
-        encoder = new FileEncoder(fd, s.st_size);
-        socket_manager->send(encoder, socket, request.keepAlive);
+        socket_manager->send(
+            new FileEncoder(socket, fd, s.st_size),
+            request.keepAlive);
       }
     }
   } else if (response.type == Response::PIPE) {
@@ -1588,8 +1590,9 @@ bool HttpProxy::process(const Future<Res
 
     VLOG(1) << "Starting \"chunked\" streaming";
 
-    Encoder* encoder = new HttpResponseEncoder(response, request);
-    socket_manager->send(encoder, socket, true);
+    socket_manager->send(
+        new HttpResponseEncoder(socket, response, request),
+        true);
 
     pipe = response.pipe;
 
@@ -1649,9 +1652,8 @@ void HttpProxy::stream(const Future<shor
         // We always persist the connection when we're not finished
         // streaming.
         socket_manager->send(
-          new DataEncoder(out.str()),
-          socket,
-          finished ? request.keepAlive : true);
+            new DataEncoder(socket, out.str()),
+            finished ? request.keepAlive : true);
       }
     }
   } else if (poll.isFailed()) {
@@ -1724,28 +1726,26 @@ void SocketManager::link(ProcessBase* pr
         LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
       }
 
-      Socket socket = Socket(s);
-
-      sockets[s] = socket;
+      sockets[s] = Socket(s);
       nodes[s] = node;
 
       persists[node] = s;
 
-      sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = PF_INET;
-      addr.sin_port = htons(to.port);
-      addr.sin_addr.s_addr = to.ip;
-
       // Allocate and initialize the decoder and watcher (we really
       // only "receive" on this socket so that we can react when it
       // gets closed and generate appropriate lost events).
-      DataDecoder* decoder = new DataDecoder(socket);
+      DataDecoder* decoder = new DataDecoder(sockets[s]);
 
       ev_io* watcher = new ev_io();
       watcher->data = decoder;
 
       // Try and connect to the node using this socket.
+      sockaddr_in addr;
+      memset(&addr, 0, sizeof(addr));
+      addr.sin_family = PF_INET;
+      addr.sin_port = htons(to.port);
+      addr.sin_addr.s_addr = to.ip;
+
       if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
           PLOG(FATAL) << "Failed to link, connect";
@@ -1771,7 +1771,7 @@ void SocketManager::link(ProcessBase* pr
 }
 
 
-PID<HttpProxy> SocketManager::proxy(int s)
+PID<HttpProxy> SocketManager::proxy(const Socket& socket)
 {
   HttpProxy* proxy = NULL;
 
@@ -1779,12 +1779,12 @@ PID<HttpProxy> SocketManager::proxy(int 
     // This socket might have been asked to get closed (e.g., remote
     // side hang up) while a process is attempting to handle an HTTP
     // request. Thus, if there is no more socket, return an empty PID.
-    if (sockets.count(s) > 0) {
-      if (proxies.count(s) > 0) {
-        return proxies[s]->self();
+    if (sockets.count(socket) > 0) {
+      if (proxies.count(socket) > 0) {
+        return proxies[socket]->self();
       } else {
-        proxy = new HttpProxy(sockets[s]);
-        proxies[s] = proxy;
+        proxy = new HttpProxy(sockets[socket]);
+        proxies[socket] = proxy;
       }
     }
   }
@@ -1803,29 +1803,29 @@ PID<HttpProxy> SocketManager::proxy(int 
 }
 
 
-void SocketManager::send(Encoder* encoder, int s, bool persist)
+void SocketManager::send(Encoder* encoder, bool persist)
 {
   CHECK(encoder != NULL);
 
   synchronized (this) {
-    if (sockets.count(s) > 0) {
+    if (sockets.count(encoder->socket()) > 0) {
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
       if (!persist) {
-        dispose.insert(s);
+        dispose.insert(encoder->socket());
       }
 
-      if (outgoing.count(s) > 0) {
-        outgoing[s].push(encoder);
+      if (outgoing.count(encoder->socket()) > 0) {
+        outgoing[encoder->socket()].push(encoder);
       } else {
         // Initialize the outgoing queue.
-        outgoing[s];
+        outgoing[encoder->socket()];
 
         // Allocate and initialize the watcher.
         ev_io* watcher = new ev_io();
         watcher->data = encoder;
 
-        ev_io_init(watcher, encoder->sender(), s, EV_WRITE);
+        ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
 
         synchronized (watchers) {
           watchers->push(watcher);
@@ -1844,7 +1844,7 @@ void SocketManager::send(Encoder* encode
 void SocketManager::send(
     const Response& response,
     const Request& request,
-    int s)
+    const Socket& socket)
 {
   bool persist = request.keepAlive;
 
@@ -1856,7 +1856,7 @@ void SocketManager::send(
     }
   }
 
-  send(new HttpResponseEncoder(response, request), s, persist);
+  send(new HttpResponseEncoder(socket, response, request), persist);
 }
 
 
@@ -1864,8 +1864,6 @@ void SocketManager::send(Message* messag
 {
   CHECK(message != NULL);
 
-  DataEncoder* encoder = new MessageEncoder(message);
-
   Node node(message->to.ip, message->to.port);
 
   synchronized (this) {
@@ -1874,7 +1872,8 @@ void SocketManager::send(Message* messag
     bool temp = temps.count(node) > 0;
     if (persist || temp) {
       int s = persist ? persists[node] : temps[node];
-      send(encoder, s, persist);
+      CHECK(sockets.count(s) > 0);
+      send(new MessageEncoder(sockets[s], message), persist);
     } else {
       // No peristant or temporary socket to the node currently
       // exists, so we create a temporary one.
@@ -1903,6 +1902,10 @@ void SocketManager::send(Message* messag
       // Initialize the outgoing queue.
       outgoing[s];
 
+      // Allocate and initialize the watcher.
+      ev_io* watcher = new ev_io();
+      watcher->data = new MessageEncoder(sockets[s], message);
+
       // Try and connect to the node using this socket.
       sockaddr_in addr;
       memset(&addr, 0, sizeof(addr));
@@ -1910,10 +1913,6 @@ void SocketManager::send(Message* messag
       addr.sin_port = htons(message->to.port);
       addr.sin_addr.s_addr = message->to.ip;
 
-      // Allocate and initialize the watcher.
-      ev_io* watcher = new ev_io();
-      watcher->data = encoder;
-
       if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
           PLOG(FATAL) << "Failed to send, connect";
@@ -3153,9 +3152,9 @@ void read(int fd,
     return;
   }
 
-  // Since promise->future() will be discarded before future is discarded, we
-  // should never see a discarded future here because of the check in the
-  // beginning of this function.
+  // Since promise->future() will be discarded before future is
+  // discarded, we should never see a discarded future here because of
+  // the check in the beginning of this function.
   CHECK(!future.isDiscarded());
 
   if (future.isFailed()) {
@@ -3218,7 +3217,7 @@ Future<size_t> read(int fd, void* data, 
   Try<bool> nonblock = os::isNonblock(fd);
   if (nonblock.isError()) {
     // The file descriptor is not valid (e.g. fd has been closed).
-    promise->fail(strerror(errno));
+    promise->fail(string("Failed to check O_NONBLOCK") + strerror(errno));
     return promise->future();
   } else if (!nonblock.get()) {
     // The fd is not opened with O_NONBLOCK set.
@@ -3254,8 +3253,7 @@ Future<string> _read(int fd,
   return io::read(fd, data.get(), length)
     .then([=] (size_t size) {
       if (size == 0) { // EOF.
-        string result(*buffer);
-        return Future<string>(result);
+        return string(*buffer);
       }
       buffer->append(data, size);
       return _read(fd, buffer, data, length);
@@ -3278,11 +3276,11 @@ Future<string> __read(
     size_t length)
 {
   if (size == 0) { // EOF.
-    string result(*buffer);
-    return Future<string>(result);
+    return string(*buffer);
   }
 
   buffer->append(data.get(), size);
+
   return _read(fd, buffer, data, length);
 }
 
@@ -3367,7 +3365,8 @@ Future<Response> get(const UPID& upid, c
   addr.sin_addr.s_addr = upid.ip;
 
   if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
-    return Future<Response>::failed(strerror(errno));
+    return Future<Response>::failed(
+        string("Failed to connect: ") + strerror(errno));
   }
 
   std::ostringstream out;
@@ -3388,7 +3387,8 @@ Future<Response> get(const UPID& upid, c
       if (errno == EINTR) {
         continue;
       }
-      return Future<Response>::failed(strerror(errno));
+      return Future<Response>::failed(
+          string("Failed to write: ") + strerror(errno));
     }
 
     remaining -= n;