You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:22 UTC
[25/30] mesos git commit: Add connect() to the Socket interface.
Add connect() to the Socket interface.
Review: https://reviews.apache.org/r/27958
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d616f8d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d616f8d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d616f8d
Branch: refs/heads/master
Commit: 4d616f8d7194be9b251aa5a3ddc51783c4d4411a
Parents: 702b382
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 41 ++++-
3rdparty/libprocess/src/process.cpp | 193 +++++++++++---------
2 files changed, 138 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 9f4302e..fdad91f 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,12 @@
#include <assert.h>
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/node.hpp>
+
#include <stout/abort.hpp>
-#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
@@ -14,7 +18,8 @@ namespace process {
// Returns a socket fd for the specified options. Note that on OS X,
// the returned socket will have the SO_NOSIGPIPE option set.
-inline Try<int> socket(int family, int type, int protocol) {
+inline Try<int> socket(int family, int type, int protocol)
+{
int s;
if ((s = ::socket(family, type, protocol)) == -1) {
return ErrnoError();
@@ -41,7 +46,12 @@ inline Try<int> socket(int family, int type, int protocol) {
class Socket
{
public:
- class Impl
+ // Each socket is a reference counted, shared by default, concurrent
+ // object. However, since we want to support multiple
+ // implementations we use the Pimpl pattern (often called the
+ // compilation firewall) rather than forcing each Socket
+ // implementation to do this themselves.
+ class Impl : public std::enable_shared_from_this<Impl>
{
public:
Impl() : s(-1) {}
@@ -53,8 +63,8 @@ public:
if (s >= 0) {
Try<Nothing> close = os::close(s);
if (close.isError()) {
- ABORT(
- "Failed to close socket " + stringify(s) + ": " + close.error());
+ ABORT("Failed to close socket " +
+ stringify(s) + ": " + close.error());
}
}
}
@@ -64,6 +74,8 @@ public:
return s >= 0 ? s : create().get();
}
+ Future<Socket> connect(const Node& node);
+
private:
const Impl& create() const
{
@@ -78,6 +90,11 @@ public:
}
// Mutable so that the socket can be lazily created.
+ //
+ // TODO(benh): Create a factory for sockets and don't lazily
+ // create but instead return a Try<Socket> from the factory in
+ // order to eliminate the need for a mutable member or the call to
+ // ABORT above.
mutable int s;
};
@@ -95,8 +112,20 @@ public:
return impl->get();
}
+ int get() const
+ {
+ return impl->get();
+ }
+
+ Future<Socket> connect(const Node& node)
+ {
+ return impl->connect(node);
+ }
+
private:
- memory::shared_ptr<Impl> impl;
+ explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+
+ std::shared_ptr<Impl> impl;
};
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 48e5486..6916cbb 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1484,6 +1484,48 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
}
+namespace internal {
+
+Future<Socket> connect(const Socket& socket)
+{
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+ int s = socket.get();
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ return Failure("Socket error while connecting");
+ }
+
+ return socket;
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::connect(const Node& node)
+{
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_port = htons(node.port);
+ addr.sin_addr.s_addr = node.ip;
+
+ if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+ if (errno != EINPROGRESS) {
+ return Failure(ErrnoError("Failed to connect socket"));
+ }
+
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ }
+
+ return Socket(shared_from_this());
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1503,6 +1545,25 @@ Socket SocketManager::accepted(int s)
}
+namespace internal {
+
+void link_connect(const Future<Socket>& socket)
+{
+ if (socket.isDiscarded() || socket.isFailed()) {
+ if (socket.isFailed()) {
+ VLOG(1) << "Failed to link, connect: " << socket.failure();
+ }
+ socket_manager->close(socket.get());
+ return;
+ }
+
+ io::poll(socket.get(), io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+}
+
+} // namespace internal {
+
+
void SocketManager::link(ProcessBase* process, const UPID& to)
{
// TODO(benh): The semantics we want to support for link are such
@@ -1516,58 +1577,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
synchronized (this) {
+ links[to].insert(process);
+
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
- Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
- if (socket.isError()) {
- LOG(FATAL) << "Failed to link, socket: " << socket.error();
- }
-
- int s = socket.get();
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
- }
+ Socket socket;
+ int s = socket;
- sockets[s] = Socket(s);
+ sockets[s] = socket;
nodes[s] = to.node;
persists[to.node] = s;
- // Try and connect to the node using this socket in order to
- // start reading data. Note that we don't expect to receive
- // anything other than HTTP '202 Accepted' responses which we
- // anyway ignore. We do, however, want to react when it gets
- // closed so we can generate appropriate lost events (since this
- // is a 'link').
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(to.node.port);
- addr.sin_addr.s_addr = to.node.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to link, connect";
- }
-
- // Wait for socket to be connected.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
- } else {
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
- }
+ socket.connect(to.node)
+ .onAny(lambda::bind(&internal::link_connect, lambda::_1));
}
-
- links[to].insert(process);
}
}
@@ -1653,11 +1678,40 @@ void SocketManager::send(
}
+namespace internal {
+
+void send_connect(const Future<Socket>& socket, Message* message)
+{
+ if (socket.isDiscarded() || socket.isFailed()) {
+ if (socket.isFailed()) {
+ VLOG(1) << "Failed to send, connect: " << socket.failure();
+ }
+ socket_manager->close(socket.get());
+ delete message;
+ return;
+ }
+
+ Encoder* encoder = new MessageEncoder(socket.get(), message);
+
+ // Read and ignore data from this socket. Note that we don't
+ // expect to receive anything other than HTTP '202 Accepted'
+ // responses which we just ignore.
+ io::poll(socket.get(), io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+
+ // Start polling in order to send data.
+ io::poll(socket.get(), io::WRITE)
+ .onAny(lambda::bind(&send_data, encoder));
+}
+
+} // namespace internal {
+
+
void SocketManager::send(Message* message)
{
CHECK(message != NULL);
- Node node(message->to.node);
+ const Node& node = message->to.node;
synchronized (this) {
// Check if there is already a socket.
@@ -1670,24 +1724,10 @@ void SocketManager::send(Message* message)
} else {
// No peristent or temporary socket to the node currently
// exists, so we create a temporary one.
- Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
- if (socket.isError()) {
- LOG(FATAL) << "Failed to send, socket: " << socket.error();
- }
-
- int s = socket.get();
+ Socket socket;
+ int s = socket;
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
- }
-
- sockets[s] = Socket(s);
+ sockets[s] = socket;
nodes[s] = node;
temps[node] = s;
@@ -1696,35 +1736,8 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- // Create a message encoder to handle sending this message.
- Encoder* encoder = new MessageEncoder(sockets[s], message);
-
- // Read and ignore data from this socket. Note that we don't
- // expect to receive anything other than HTTP '202 Accepted'
- // responses which we just ignore.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-
- // Try and connect to the node using this socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(node.port);
- addr.sin_addr.s_addr = node.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to send, connect";
- }
-
- // Start polling in order to wait for being connected.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&sending_connect, encoder));
- } else {
- // Start polling in order to send data.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&send_data, encoder));
- }
+ socket.connect(node)
+ .onAny(lambda::bind(&internal::send_connect, lambda::_1, message));
}
}
}