You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/11/29 21:29:38 UTC
[05/14] mesos git commit: Removed `Socket` dependency on
`Socket::Impl`.
Removed `Socket` dependency on `Socket::Impl`.
Review: https://reviews.apache.org/r/53458
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aa68496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aa68496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aa68496
Branch: refs/heads/master
Commit: 7aa68496df050ed1fc7be8aed0f9b46230039c3a
Parents: 5c093c8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 22:33:31 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 26 -----------------
3rdparty/libprocess/src/libevent_ssl_socket.cpp | 30 ++++++++++----------
3rdparty/libprocess/src/libevent_ssl_socket.hpp | 6 ++--
3rdparty/libprocess/src/poll_socket.cpp | 30 +++++++++++---------
3rdparty/libprocess/src/socket.cpp | 20 ++++++-------
5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 97bdcb9..a70954a 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -202,27 +202,6 @@ public:
virtual Socket::Kind kind() const = 0;
- /**
- * Construct a new `Socket` from the given impl.
- *
- * This is a proxy function, as `Impl`s derived from this won't have
- * access to the Socket::Socket(...) constructors.
- */
- // TODO(jmlvanre): These should be protected; however, gcc complains
- // when using them from within a lambda of a derived class.
- static Socket socket(std::shared_ptr<Impl>&& that)
- {
- return Socket(std::move(that));
- }
-
- /**
- * @copydoc process::network::Socket::Impl::socket
- */
- static Socket socket(const std::shared_ptr<Impl>& that)
- {
- return Socket(that);
- }
-
protected:
explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
@@ -240,11 +219,6 @@ public:
}
/**
- * Construct a `Socket` wrapper from this implementation.
- */
- Socket socket() { return Socket(shared_from_this()); }
-
- /**
* Returns a `std::shared_ptr<T>` from this implementation.
*/
template <typename T>
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 5c0929d..9cade79 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -31,6 +31,7 @@
#include "libevent.hpp"
#include "libevent_ssl_socket.hpp"
#include "openssl.hpp"
+#include "poll_socket.hpp"
// Locking:
//
@@ -898,14 +899,15 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
}
-Future<Socket> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
{
// We explicitly specify the return type to avoid a type deduction
// issue in some versions of clang. See MESOS-2943.
return accept_queue.get()
- .then([](const Future<Socket>& socket) -> Future<Socket> {
- CHECK(!socket.isPending());
- return socket;
+ .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
+ -> Future<std::shared_ptr<Socket::Impl>> {
+ CHECK(!impl.isPending());
+ return impl;
});
}
@@ -964,12 +966,12 @@ void LibeventSSLSocketImpl::peek_callback(
if (ssl) {
accept_SSL_callback(request);
} else {
- // Downgrade to a non-SSL socket.
- Try<Socket> create = Socket::create(Socket::POLL, fd);
- if (create.isError()) {
- request->promise.fail(create.error());
+ // Downgrade to a non-SSL socket implementation.
+ Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+ if (impl.isError()) {
+ request->promise.fail(impl.error());
} else {
- request->promise.set(create.get());
+ request->promise.set(impl.get());
}
delete request;
@@ -981,14 +983,14 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
{
CHECK(__in_event_loop__);
- Queue<Future<Socket>> accept_queue_ = accept_queue;
+ Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
// After the socket is accepted, it must complete the SSL
// handshake (or be downgraded to a regular socket) before
// we put it in the queue of connected sockets.
request->promise.future()
- .onAny([accept_queue_](Future<Socket> socket) mutable {
- accept_queue_.put(socket);
+ .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+ accept_queue_.put(impl);
});
// If we support downgrading the connection, first wait for this
@@ -1119,9 +1121,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
&LibeventSSLSocketImpl::event_callback,
CHECK_NOTNULL(impl->event_loop_handle));
- Socket socket = Socket::Impl::socket(std::move(impl));
-
- request->promise.set(socket);
+ request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
} else if (events & BEV_EVENT_ERROR) {
std::ostringstream stream;
if (EVUTIL_SOCKET_ERROR() != 0) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index acb00d4..ed53976 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -45,7 +45,7 @@ public:
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
virtual Try<Nothing> listen(int backlog);
- virtual Future<Socket> accept();
+ virtual Future<std::shared_ptr<Socket::Impl>> accept();
virtual Socket::Kind kind() const { return Socket::SSL; }
// This call is used to do the equivalent of shutting down the read
@@ -74,7 +74,7 @@ private:
socket(_socket),
ip(_ip) {}
event* peek_event;
- Promise<Socket> promise;
+ Promise<std::shared_ptr<Socket::Impl>> promise;
evconnlistener* listener;
int socket;
Option<net::IP> ip;
@@ -175,7 +175,7 @@ private:
// downgraded). The 'accept()' call returns sockets from this queue.
// We wrap the socket in a 'Future' so that we can pass failures or
// discards through.
- Queue<Future<Socket>> accept_queue;
+ Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
Option<std::string> peer_hostname;
Option<net::IP> peer_ip;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index d04f048..ff06e56 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -114,12 +114,14 @@ Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
namespace internal {
-Future<Nothing> connect(const Socket& socket, const Address& to)
+Future<Nothing> connect(
+ const std::shared_ptr<PollSocketImpl>& socket,
+ const Address& to)
{
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
- int s = socket.get();
+ int s = socket->get();
// NOTE: We cast to `char*` here because the function prototypes on Windows
// use `char*` instead of `void*`.
@@ -149,7 +151,7 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
if (connect.isError()) {
if (net::is_inprogress_error(connect.error().code)) {
return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, socket(), address));
+ .then(lambda::bind(&internal::connect, shared(this), address));
}
return Failure(connect.error());
@@ -167,12 +169,14 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
namespace internal {
-Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
+Future<size_t> socket_send_data(
+ const std::shared_ptr<PollSocketImpl>& impl,
+ const char* data, size_t size)
{
CHECK(size > 0);
while (true) {
- ssize_t length = net::send(socket.get(), data, size, MSG_NOSIGNAL);
+ ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
#ifdef __WINDOWS__
int error = WSAGetLastError();
@@ -185,8 +189,8 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
continue;
} else if (length < 0 && net::is_retryable_error(error)) {
// Might block, try again later.
- return io::poll(socket.get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, socket, data, size));
+ return io::poll(impl->get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, impl, data, size));
} else if (length <= 0) {
// Socket error or closed.
if (length < 0) {
@@ -207,7 +211,7 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
Future<size_t> socket_send_file(
- Socket socket,
+ const std::shared_ptr<PollSocketImpl>& impl,
int fd,
off_t offset,
size_t size)
@@ -216,7 +220,7 @@ Future<size_t> socket_send_file(
while (true) {
Try<ssize_t, SocketError> length =
- os::sendfile(socket.get(), fd, offset, size);
+ os::sendfile(impl->get(), fd, offset, size);
if (length.isSome()) {
CHECK(length.get() >= 0);
@@ -232,10 +236,10 @@ Future<size_t> socket_send_file(
continue;
} else if (net::is_retryable_error(length.error().code)) {
// Might block, try again later.
- return io::poll(socket.get(), io::WRITE)
+ return io::poll(impl->get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_file,
- socket,
+ impl,
fd,
offset,
size));
@@ -255,7 +259,7 @@ Future<size_t> PollSocketImpl::send(const char* data, size_t size)
return io::poll(get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_data,
- socket(),
+ shared(this),
data,
size));
}
@@ -266,7 +270,7 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
return io::poll(get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_file,
- socket(),
+ shared(this),
fd,
offset,
size));
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 7f93168..8f372aa 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -143,7 +143,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
static Future<string> _recv(
- Socket socket,
+ const std::shared_ptr<Socket::Impl>& impl,
const Option<ssize_t>& size,
Owned<string> buffer,
size_t chunk,
@@ -165,9 +165,9 @@ static Future<string> _recv(
// We've been asked to receive until EOF so keep receiving since
// according to the 'length == 0' check above we haven't reached
// EOF yet.
- return socket.recv(data.get(), chunk)
+ return impl->recv(data.get(), chunk)
.then(lambda::bind(&_recv,
- socket,
+ impl,
size,
buffer,
chunk,
@@ -176,9 +176,9 @@ static Future<string> _recv(
} else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
// We've been asked to receive a particular amount of data and we
// haven't yet received that much data so keep receiving.
- return socket.recv(data.get(), size.get() - buffer->size())
+ return impl->recv(data.get(), size.get() - buffer->size())
.then(lambda::bind(&_recv,
- socket,
+ impl,
size,
buffer,
chunk,
@@ -206,7 +206,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
return recv(data.get(), chunk)
.then(lambda::bind(&_recv,
- socket(),
+ shared_from_this(),
size,
buffer,
chunk,
@@ -216,7 +216,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
static Future<Nothing> _send(
- Socket socket,
+ const std::shared_ptr<Socket::Impl>& impl,
Owned<string> data,
size_t index,
size_t length)
@@ -230,8 +230,8 @@ static Future<Nothing> _send(
}
// Keep sending!
- return socket.send(data->data() + index, data->size() - index)
- .then(lambda::bind(&_send, socket, data, index, lambda::_1));
+ return impl->send(data->data() + index, data->size() - index)
+ .then(lambda::bind(&_send, impl, data, index, lambda::_1));
}
@@ -240,7 +240,7 @@ Future<Nothing> Socket::Impl::send(const string& _data)
Owned<string> data(new string(_data));
return send(data->data(), data->size())
- .then(lambda::bind(&_send, socket(), data, 0, lambda::_1));
+ .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
}