You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/11/29 21:29:38 UTC

[05/14] mesos git commit: Removed `Socket` dependency on `Socket::Impl`.

Removed `Socket` dependency on `Socket::Impl`.

Review: https://reviews.apache.org/r/53458


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aa68496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aa68496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aa68496

Branch: refs/heads/master
Commit: 7aa68496df050ed1fc7be8aed0f9b46230039c3a
Parents: 5c093c8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 22:33:31 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp  | 26 -----------------
 3rdparty/libprocess/src/libevent_ssl_socket.cpp | 30 ++++++++++----------
 3rdparty/libprocess/src/libevent_ssl_socket.hpp |  6 ++--
 3rdparty/libprocess/src/poll_socket.cpp         | 30 +++++++++++---------
 3rdparty/libprocess/src/socket.cpp              | 20 ++++++-------
 5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 97bdcb9..a70954a 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -202,27 +202,6 @@ public:
 
     virtual Socket::Kind kind() const = 0;
 
-    /**
-     * Construct a new `Socket` from the given impl.
-     *
-     * This is a proxy function, as `Impl`s derived from this won't have
-     * access to the Socket::Socket(...) constructors.
-     */
-    // TODO(jmlvanre): These should be protected; however, gcc complains
-    // when using them from within a lambda of a derived class.
-    static Socket socket(std::shared_ptr<Impl>&& that)
-    {
-      return Socket(std::move(that));
-    }
-
-    /**
-     * @copydoc process::network::Socket::Impl::socket
-     */
-    static Socket socket(const std::shared_ptr<Impl>& that)
-    {
-      return Socket(that);
-    }
-
   protected:
     explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
@@ -240,11 +219,6 @@ public:
     }
 
     /**
-     * Construct a `Socket` wrapper from this implementation.
-     */
-    Socket socket() { return Socket(shared_from_this()); }
-
-    /**
      * Returns a `std::shared_ptr<T>` from this implementation.
      */
     template <typename T>

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 5c0929d..9cade79 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -31,6 +31,7 @@
 #include "libevent.hpp"
 #include "libevent_ssl_socket.hpp"
 #include "openssl.hpp"
+#include "poll_socket.hpp"
 
 // Locking:
 //
@@ -898,14 +899,15 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
 }
 
 
-Future<Socket> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
 {
   // We explicitly specify the return type to avoid a type deduction
   // issue in some versions of clang. See MESOS-2943.
   return accept_queue.get()
-    .then([](const Future<Socket>& socket) -> Future<Socket> {
-      CHECK(!socket.isPending());
-      return socket;
+    .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
+      -> Future<std::shared_ptr<Socket::Impl>> {
+      CHECK(!impl.isPending());
+      return impl;
     });
 }
 
@@ -964,12 +966,12 @@ void LibeventSSLSocketImpl::peek_callback(
   if (ssl) {
     accept_SSL_callback(request);
   } else {
-    // Downgrade to a non-SSL socket.
-    Try<Socket> create = Socket::create(Socket::POLL, fd);
-    if (create.isError()) {
-      request->promise.fail(create.error());
+    // Downgrade to a non-SSL socket implementation.
+    Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+    if (impl.isError()) {
+      request->promise.fail(impl.error());
     } else {
-      request->promise.set(create.get());
+      request->promise.set(impl.get());
     }
 
     delete request;
@@ -981,14 +983,14 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
 {
   CHECK(__in_event_loop__);
 
-  Queue<Future<Socket>> accept_queue_ = accept_queue;
+  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
 
   // After the socket is accepted, it must complete the SSL
   // handshake (or be downgraded to a regular socket) before
   // we put it in the queue of connected sockets.
   request->promise.future()
-    .onAny([accept_queue_](Future<Socket> socket) mutable {
-      accept_queue_.put(socket);
+    .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+      accept_queue_.put(impl);
     });
 
   // If we support downgrading the connection, first wait for this
@@ -1119,9 +1121,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
               &LibeventSSLSocketImpl::event_callback,
               CHECK_NOTNULL(impl->event_loop_handle));
 
-          Socket socket = Socket::Impl::socket(std::move(impl));
-
-          request->promise.set(socket);
+          request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
         } else if (events & BEV_EVENT_ERROR) {
           std::ostringstream stream;
           if (EVUTIL_SOCKET_ERROR() != 0) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index acb00d4..ed53976 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -45,7 +45,7 @@ public:
   virtual Future<size_t> send(const char* data, size_t size);
   virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
   virtual Try<Nothing> listen(int backlog);
-  virtual Future<Socket> accept();
+  virtual Future<std::shared_ptr<Socket::Impl>> accept();
   virtual Socket::Kind kind() const { return Socket::SSL; }
 
   // This call is used to do the equivalent of shutting down the read
@@ -74,7 +74,7 @@ private:
         socket(_socket),
         ip(_ip) {}
     event* peek_event;
-    Promise<Socket> promise;
+    Promise<std::shared_ptr<Socket::Impl>> promise;
     evconnlistener* listener;
     int socket;
     Option<net::IP> ip;
@@ -175,7 +175,7 @@ private:
   // downgraded). The 'accept()' call returns sockets from this queue.
   // We wrap the socket in a 'Future' so that we can pass failures or
   // discards through.
-  Queue<Future<Socket>> accept_queue;
+  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
 
   Option<std::string> peer_hostname;
   Option<net::IP> peer_ip;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index d04f048..ff06e56 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -114,12 +114,14 @@ Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
 
 namespace internal {
 
-Future<Nothing> connect(const Socket& socket, const Address& to)
+Future<Nothing> connect(
+    const std::shared_ptr<PollSocketImpl>& socket,
+    const Address& to)
 {
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
-  int s = socket.get();
+  int s = socket->get();
 
   // NOTE: We cast to `char*` here because the function prototypes on Windows
   // use `char*` instead of `void*`.
@@ -149,7 +151,7 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
   if (connect.isError()) {
     if (net::is_inprogress_error(connect.error().code)) {
       return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, socket(), address));
+        .then(lambda::bind(&internal::connect, shared(this), address));
     }
 
     return Failure(connect.error());
@@ -167,12 +169,14 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 
 namespace internal {
 
-Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
+Future<size_t> socket_send_data(
+    const std::shared_ptr<PollSocketImpl>& impl,
+    const char* data, size_t size)
 {
   CHECK(size > 0);
 
   while (true) {
-    ssize_t length = net::send(socket.get(), data, size, MSG_NOSIGNAL);
+    ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
 
 #ifdef __WINDOWS__
     int error = WSAGetLastError();
@@ -185,8 +189,8 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
       continue;
     } else if (length < 0 && net::is_retryable_error(error)) {
       // Might block, try again later.
-      return io::poll(socket.get(), io::WRITE)
-        .then(lambda::bind(&internal::socket_send_data, socket, data, size));
+      return io::poll(impl->get(), io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, impl, data, size));
     } else if (length <= 0) {
       // Socket error or closed.
       if (length < 0) {
@@ -207,7 +211,7 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
 
 
 Future<size_t> socket_send_file(
-    Socket socket,
+    const std::shared_ptr<PollSocketImpl>& impl,
     int fd,
     off_t offset,
     size_t size)
@@ -216,7 +220,7 @@ Future<size_t> socket_send_file(
 
   while (true) {
     Try<ssize_t, SocketError> length =
-      os::sendfile(socket.get(), fd, offset, size);
+      os::sendfile(impl->get(), fd, offset, size);
 
     if (length.isSome()) {
       CHECK(length.get() >= 0);
@@ -232,10 +236,10 @@ Future<size_t> socket_send_file(
       continue;
     } else if (net::is_retryable_error(length.error().code)) {
       // Might block, try again later.
-      return io::poll(socket.get(), io::WRITE)
+      return io::poll(impl->get(), io::WRITE)
         .then(lambda::bind(
             &internal::socket_send_file,
-            socket,
+            impl,
             fd,
             offset,
             size));
@@ -255,7 +259,7 @@ Future<size_t> PollSocketImpl::send(const char* data, size_t size)
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(
         &internal::socket_send_data,
-        socket(),
+        shared(this),
         data,
         size));
 }
@@ -266,7 +270,7 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(
         &internal::socket_send_file,
-        socket(),
+        shared(this),
         fd,
         offset,
         size));

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 7f93168..8f372aa 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -143,7 +143,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
 
 
 static Future<string> _recv(
-    Socket socket,
+    const std::shared_ptr<Socket::Impl>& impl,
     const Option<ssize_t>& size,
     Owned<string> buffer,
     size_t chunk,
@@ -165,9 +165,9 @@ static Future<string> _recv(
     // We've been asked to receive until EOF so keep receiving since
     // according to the 'length == 0' check above we haven't reached
     // EOF yet.
-    return socket.recv(data.get(), chunk)
+    return impl->recv(data.get(), chunk)
       .then(lambda::bind(&_recv,
-                         socket,
+                         impl,
                          size,
                          buffer,
                          chunk,
@@ -176,9 +176,9 @@ static Future<string> _recv(
   } else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
     // We've been asked to receive a particular amount of data and we
     // haven't yet received that much data so keep receiving.
-    return socket.recv(data.get(), size.get() - buffer->size())
+    return impl->recv(data.get(), size.get() - buffer->size())
       .then(lambda::bind(&_recv,
-                         socket,
+                         impl,
                          size,
                          buffer,
                          chunk,
@@ -206,7 +206,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
 
   return recv(data.get(), chunk)
     .then(lambda::bind(&_recv,
-                       socket(),
+                       shared_from_this(),
                        size,
                        buffer,
                        chunk,
@@ -216,7 +216,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
 
 
 static Future<Nothing> _send(
-    Socket socket,
+    const std::shared_ptr<Socket::Impl>& impl,
     Owned<string> data,
     size_t index,
     size_t length)
@@ -230,8 +230,8 @@ static Future<Nothing> _send(
   }
 
   // Keep sending!
-  return socket.send(data->data() + index, data->size() - index)
-    .then(lambda::bind(&_send, socket, data, index, lambda::_1));
+  return impl->send(data->data() + index, data->size() - index)
+    .then(lambda::bind(&_send, impl, data, index, lambda::_1));
 }
 
 
@@ -240,7 +240,7 @@ Future<Nothing> Socket::Impl::send(const string& _data)
   Owned<string> data(new string(_data));
 
   return send(data->data(), data->size())
-    .then(lambda::bind(&_send, socket(), data, 0, lambda::_1));
+    .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
 }