You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:02 UTC
[11/12] mesos git commit: Virtualize Socket::Impl interface.
Virtualize Socket::Impl interface.
Review: https://reviews.apache.org/r/28671
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d46e853
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d46e853
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d46e853
Branch: refs/heads/master
Commit: 2d46e853321469bcd7e2d3727254205c19809b7f
Parents: b3bb74c
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Dec 25 10:55:06 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/process/socket.hpp | 62 ++++----
3rdparty/libprocess/src/poll_socket.cpp | 153 ++++++++++----------
3rdparty/libprocess/src/poll_socket.hpp | 29 ++++
3rdparty/libprocess/src/socket.cpp | 37 +++--
5 files changed, 163 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 817355e..6ab9cb8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -43,6 +43,7 @@ libprocess_la_SOURCES = \
src/metrics/metrics.cpp \
src/pid.cpp \
src/poll_socket.cpp \
+ src/poll_socket.hpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 436761b..ddb9e36 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -111,8 +111,8 @@ class Socket
public:
// Available kinds of implementations.
enum Kind {
- POLL
- // TODO(jmlvanre): Add libevent socket.
+ POLL,
+ // TODO(jmlvanre): Add libevent SSL socket.
};
// Returns an instance of a Socket using the specified kind of
@@ -131,9 +131,7 @@ public:
class Impl : public std::enable_shared_from_this<Impl>
{
public:
- explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
- ~Impl()
+ virtual ~Impl()
{
CHECK(s >= 0);
Try<Nothing> close = os::close(s);
@@ -148,21 +146,21 @@ public:
return s;
}
- Future<Nothing> connect(const Node& node);
-
- Future<size_t> recv(char* data, size_t size);
-
- Future<size_t> send(const char* data, size_t size);
-
- Future<size_t> sendfile(int fd, off_t offset, size_t size);
+ // Socket::Impl interface.
+ virtual Try<Node> bind(const Node& node);
+ virtual Try<Nothing> listen(int backlog) = 0;
+ virtual Future<Socket> accept() = 0;
+ virtual Future<Nothing> connect(const Node& node) = 0;
+ virtual Future<size_t> recv(char* data, size_t size) = 0;
+ virtual Future<size_t> send(const char* data, size_t size) = 0;
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
- Try<Node> bind(const Node& node);
-
- Try<Nothing> listen(int backlog);
+ protected:
+ explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
- Future<Socket> accept();
+ // Construct a Socket wrapper from this implementation.
+ Socket socket() { return Socket(shared_from_this()); }
- private:
int s;
};
@@ -181,44 +179,46 @@ public:
return impl->get();
}
- Future<Nothing> connect(const Node& node)
+ Try<Node> bind(const Node& node)
{
- return impl->connect(node);
+ return impl->bind(node);
}
- Future<size_t> recv(char* data, size_t size) const
+ Try<Nothing> listen(int backlog)
{
- return impl->recv(data, size);
+ return impl->listen(backlog);
}
- Future<size_t> send(const char* data, size_t size) const
+ Future<Socket> accept()
{
- return impl->send(data, size);
+ return impl->accept();
}
- Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+ Future<Nothing> connect(const Node& node)
{
- return impl->sendfile(fd, offset, size);
+ return impl->connect(node);
}
- Try<Node> bind(const Node& node)
+ Future<size_t> recv(char* data, size_t size) const
{
- return impl->bind(node);
+ return impl->recv(data, size);
}
- Try<Nothing> listen(int backlog)
+ Future<size_t> send(const char* data, size_t size) const
{
- return impl->listen(backlog);
+ return impl->send(data, size);
}
- Future<Socket> accept()
+ Future<size_t> sendfile(int fd, off_t offset, size_t size) const
{
- return impl->accept();
+ return impl->sendfile(fd, offset, size);
}
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+ explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+
std::shared_ptr<Impl> impl;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 09cd5a2..2e70c6c 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -4,12 +4,81 @@
#include <process/socket.hpp>
#include "config.hpp"
+#include "poll_socket.hpp"
using std::string;
namespace process {
namespace network {
+Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+{
+ return std::make_shared<PollSocketImpl>(s);
+}
+
+
+Try<Nothing> PollSocketImpl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ Try<int> accepted = network::accept(fd, AF_INET);
+ if (accepted.isError()) {
+ return Failure(accepted.error());
+ }
+
+ int s = accepted.get();
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+ if (socket.isError()) {
+ return Failure("Failed to accept, create socket: " + socket.error());
+ }
+ return socket.get();
+}
+
+} // namespace internal {
+
+
+Future<Socket> PollSocketImpl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+
namespace internal {
Future<Nothing> connect(const Socket& socket)
@@ -31,13 +100,13 @@ Future<Nothing> connect(const Socket& socket)
} // namespace internal {
-Future<Nothing> Socket::Impl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Node& node)
{
Try<int> connect = network::connect(get(), node);
if (connect.isError()) {
if (errno == EINPROGRESS) {
return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ .then(lambda::bind(&internal::connect, socket()));
}
return Failure(connect.error());
@@ -47,7 +116,7 @@ Future<Nothing> Socket::Impl::connect(const Node& node)
}
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
+Future<size_t> PollSocketImpl::recv(char* data, size_t size)
{
return io::read(get(), data, size);
}
@@ -129,92 +198,18 @@ Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
} // namespace internal {
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
{
return io::poll(get(), io::WRITE)
.then(lambda::bind(&internal::socket_send_data, get(), data, size));
}
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
{
return io::poll(get(), io::WRITE)
.then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
}
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
- Try<int> bind = network::bind(get(), node);
- if (bind.isError()) {
- return Error(bind.error());
- }
-
- // Lookup and store assigned ip and assigned port.
- return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
- if (::listen(get(), backlog) < 0) {
- return ErrnoError();
- }
- return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
- Try<int> accepted = network::accept(fd, AF_INET);
- if (accepted.isError()) {
- return Failure(accepted.error());
- }
-
- int s = accepted.get();
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return Failure("Failed to accept, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return Failure("Failed to accept, cloexec: " + cloexec.error());
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
- }
-
- Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
- if (socket.isError()) {
- return Failure("Failed to accept, create socket: " + socket.error());
- }
- return socket.get();
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
- return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()));
-}
-
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
new file mode 100644
index 0000000..f7ca08e
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -0,0 +1,29 @@
+#include <memory>
+
+#include <process/socket.hpp>
+
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+
+class PollSocketImpl : public Socket::Impl
+{
+public:
+ static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+ PollSocketImpl(int s) : Socket::Impl(s) {}
+
+ virtual ~PollSocketImpl() {}
+
+ // Implementation of the Socket::Impl interface.
+ virtual Try<Nothing> listen(int backlog);
+ virtual Future<Socket> accept();
+ virtual Future<Nothing> connect(const Node& node);
+ virtual Future<size_t> recv(char* data, size_t size);
+ virtual Future<size_t> send(const char* data, size_t size);
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+};
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 842a6ec..4b0f6be 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,16 +1,10 @@
#include <process/socket.hpp>
+#include "poll_socket.hpp"
+
namespace process {
namespace network {
-const Socket::Kind& Socket::DEFAULT_KIND()
-{
- // TODO(jmlvanre): Change the default based on configure or
- // environment flags.
- static const Kind DEFAULT = POLL;
- return DEFAULT;
-}
-
Try<Socket> Socket::create(Kind kind, int s)
{
if (s < 0) {
@@ -44,7 +38,11 @@ Try<Socket> Socket::create(Kind kind, int s)
switch (kind) {
case POLL: {
- return Socket(std::make_shared<Socket::Impl>(s));
+ Try<std::shared_ptr<Socket::Impl>> socket = PollSocketImpl::create(s);
+ if (socket.isError()) {
+ return Error(socket.error());
+ }
+ return Socket(socket.get());
}
// By not setting a default we leverage the compiler errors when
// the enumeration is augmented to find all the cases we need to
@@ -52,5 +50,26 @@ Try<Socket> Socket::create(Kind kind, int s)
}
}
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+ // TODO(jmlvanre): Change the default based on configure or
+ // environment flags.
+ static const Kind DEFAULT = POLL;
+ return DEFAULT;
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ Try<int> bind = network::bind(get(), node);
+ if (bind.isError()) {
+ return Error(bind.error());
+ }
+
+ // Lookup and store assigned IP and assigned port.
+ return network::getsockname(get(), AF_INET);
+}
+
} // namespace network {
} // namespace process {