You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:23 UTC
[26/30] mesos git commit: Add bind(), listen(),
and accept() to Socket interface.
Add bind(), listen(), and accept() to Socket interface.
Review: https://reviews.apache.org/r/27966
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8279b45e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8279b45e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8279b45e
Branch: refs/heads/master
Commit: 8279b45ed325375bbf8cc5919fd7009c7d2335cf
Parents: 8edab65
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:55:07 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 21 ++
3rdparty/libprocess/src/libev.cpp | 1 -
3rdparty/libprocess/src/libev.hpp | 3 -
3rdparty/libprocess/src/process.cpp | 235 +++++++++++---------
4 files changed, 153 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 5fd8d1b..c022924 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -82,6 +82,12 @@ public:
Future<size_t> sendfile(int fd, off_t offset, size_t size);
+ Try<Node> bind(const Node& node);
+
+ Try<Nothing> listen(int backlog);
+
+ Future<Socket> accept();
+
private:
const Impl& create() const
{
@@ -143,6 +149,21 @@ public:
return impl->sendfile(fd, offset, size);
}
+ Try<Node> bind(const Node& node)
+ {
+ return impl->bind(node);
+ }
+
+ Try<Nothing> listen(int backlog)
+ {
+ return impl->listen(backlog);
+ }
+
+ Future<Socket> accept()
+ {
+ return impl->accept();
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index efc89d8..6560050 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -12,7 +12,6 @@ namespace process {
// libev.hpp (since these need to live in the static data space).
struct ev_loop* loop = NULL;
ev_async async_watcher;
-ev_io server_watcher;
std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
synchronizable(watchers);
std::queue<lambda::function<void(void)>>* functions =
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index bac8b6a..04847e3 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -18,9 +18,6 @@ extern struct ev_loop* loop;
// with IO watchers and functions (via run_in_event_loop).
extern ev_async async_watcher;
-// Server watcher for accepting connections.
-extern ev_io server_watcher;
-
// Queue of I/O watchers to be asynchronously added to the event loop
// (protected by 'watchers' below).
// TODO(benh): Replace this queue with functions that we put in
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9f91020..5d3b947 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -267,7 +267,7 @@ public:
SocketManager();
~SocketManager();
- Socket accepted(int s);
+ void accepted(const Socket& socket);
void link(ProcessBase* process, const UPID& to);
@@ -433,8 +433,11 @@ const string Profiler::STOP_HELP = HELP(
// Unique id that can be assigned to each process.
static uint32_t __id__ = 0;
+// Server socket listen backlog.
+static const int LISTEN_BACKLOG = 500000;
+
// Local server socket.
-static int __s__ = -1;
+static Socket __s__;
// Local node.
static Node __node__;
@@ -640,66 +643,6 @@ void decode_read(
} // namespace internal {
-void accept(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- CHECK_EQ(__s__, watcher->fd);
-
- sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
-
- if (s < 0) {
- return;
- }
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return;
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return;
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- } else {
- // Inform the socket manager for proper bookkeeping.
- Socket socket = socket_manager->accepted(s);
-
- // Allocate a buffer to read into. This can be replaced later
- // when socket supports a read function that provides the
- // buffered data in the resulting callback.
- const size_t size = 80 * 1024;
- char* data = new char[size];
- memset(data, 0, size);
-
- DataDecoder* decoder = new DataDecoder(socket);
-
- socket.read(data, size)
- .onAny(lambda::bind(
- &internal::decode_read,
- lambda::_1,
- data,
- size,
- new Socket(socket),
- decoder));
- }
-}
-
-
void* serve(void* arg)
{
ev_loop(((struct ev_loop*) arg), 0);
@@ -765,6 +708,37 @@ void timedout(list<Timer>&& timers)
// }
+namespace internal {
+
+void on_accept(const Future<Socket>& socket)
+{
+ if (socket.isReady()) {
+ // Inform the socket manager for proper bookkeeping.
+ socket_manager->accepted(socket.get());
+
+ const size_t size = 80 * 1024;
+ char* data = new char[size];
+ memset(data, 0, size);
+
+ DataDecoder* decoder = new DataDecoder(socket.get());
+
+ socket.get().read(data, size)
+ .onAny(lambda::bind(
+ &internal::decode_read,
+ lambda::_1,
+ data,
+ size,
+ new Socket(socket.get()),
+ decoder));
+ }
+
+ __s__.accept()
+ .onAny(lambda::bind(&on_accept, lambda::_1));
+}
+
+} // namespace internal {
+
+
void initialize(const string& delegate)
{
// TODO(benh): Return an error if attempting to initialize again
@@ -866,21 +840,6 @@ void initialize(const string& delegate)
}
// Create a "server" socket for communicating with other nodes.
- if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- PLOG(FATAL) << "Failed to initialize, socket";
- }
-
- // Make socket non-blocking.
- Try<Nothing> nonblock = os::nonblock(__s__);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
- }
-
- // Set FD_CLOEXEC flag.
- Try<Nothing> cloexec = os::cloexec(__s__);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
- }
// Allow address reuse.
int on = 1;
@@ -888,25 +847,12 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- // Set up socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = __node__.ip;
- addr.sin_port = htons(__node__.port);
-
- if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
- PLOG(FATAL) << "Failed to initialize, bind " << __node__;
- }
-
- // Lookup and store assigned ip and assigned port.
- socklen_t addrlen = sizeof(addr);
- if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
- PLOG(FATAL) << "Failed to initialize, getsockname";
+ Try<Node> bind = __s__.bind(__node__);
+ if (bind.isError()) {
+ PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
- __node__.ip = addr.sin_addr.s_addr;
- __node__.port = ntohs(addr.sin_port);
+ __node__ = bind.get();
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
@@ -931,8 +877,9 @@ void initialize(const string& delegate)
__node__.ip = *((uint32_t *) he->h_addr_list[0]);
}
- if (listen(__s__, 500000) < 0) {
- PLOG(FATAL) << "Failed to initialize, listen";
+ Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+ if (listen.isError()) {
+ PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
// Initialize libev.
@@ -950,9 +897,6 @@ void initialize(const string& delegate)
ev_async_init(&async_watcher, handle_async);
ev_async_start(loop, &async_watcher);
- ev_io_init(&server_watcher, accept, __s__, EV_READ);
- ev_io_start(loop, &server_watcher);
-
Clock::initialize(lambda::bind(&timedout, lambda::_1));
// ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -979,6 +923,9 @@ void initialize(const string& delegate)
// 'spawn' below for the garbage collector.
initializing = false;
+ __s__.accept()
+ .onAny(lambda::bind(&internal::on_accept, lambda::_1));
+
// TODO(benh): Make sure creating the garbage collector, logging
// process, and profiler always succeeds and use supervisors to make
// sure that none terminate.
@@ -1436,6 +1383,90 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
}
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_addr.s_addr = node.ip;
+ addr.sin_port = htons(node.port);
+
+ if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+ return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) +
+ ":" + stringify(node.port));
+ }
+
+ // Lookup and store assigned ip and assigned port.
+ socklen_t addrlen = sizeof(addr);
+ if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) {
+ return ErrnoError("Failed to bind, getsockname");
+ }
+
+ return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ int s = ::accept(fd, (sockaddr*) &addr, &addrlen);
+
+ if (s < 0) {
+ return Failure(ErrnoError("Failed to accept"));
+ }
+
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1445,13 +1476,11 @@ SocketManager::SocketManager()
SocketManager::~SocketManager() {}
-Socket SocketManager::accepted(int s)
+void SocketManager::accepted(const Socket& socket)
{
synchronized (this) {
- return sockets[s] = Socket(s);
+ sockets[socket] = socket;
}
-
- UNREACHABLE();
}