You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:00 UTC
[09/12] mesos git commit: Add a factory pattern for constructing a
Socket.
Add a factory pattern for constructing a Socket.
Add a factory pattern to allow construction of different
implementations of the socket interface. Remove default constructor
from Socket.
Review: https://reviews.apache.org/r/28670
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3bb74c2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3bb74c2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3bb74c2
Branch: refs/heads/master
Commit: b3bb74c2359c6cf3d73bcb0e57a4732af804a15b
Parents: 62ad3f2
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 17:26:13 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/process/socket.hpp | 76 +++++-----------
3rdparty/libprocess/src/poll_socket.cpp | 6 +-
3rdparty/libprocess/src/process.cpp | 92 +++++++++++++-------
3rdparty/libprocess/src/socket.cpp | 56 ++++++++++++
3rdparty/libprocess/src/tests/decoder_tests.cpp | 12 ++-
6 files changed, 152 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index f401232..817355e 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -46,6 +46,7 @@ libprocess_la_SOURCES = \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
+ src/socket.cpp \
src/subprocess.cpp \
src/synchronized.hpp \
src/timeseries.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7e1e3f2..436761b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -109,6 +109,20 @@ inline Try<Node> getsockname(int s, sa_family_t family)
class Socket
{
public:
+ // Available kinds of implementations.
+ enum Kind {
+ POLL
+ // TODO(jmlvanre): Add libevent socket.
+ };
+
+ // Returns an instance of a Socket using the specified kind of
+ // implementation and potentially wrapping the specified file
+ // descriptor.
+ static Try<Socket> create(Kind kind = DEFAULT_KIND(), int s = -1);
+
+ // Returns the default kind of implementation of Socket.
+ static const Kind& DEFAULT_KIND();
+
// Each socket is a reference counted, shared by default, concurrent
// object. However, since we want to support multiple
// implementations we use the Pimpl pattern (often called the
@@ -117,24 +131,21 @@ public:
class Impl : public std::enable_shared_from_this<Impl>
{
public:
- Impl() : s(-1) {}
-
- explicit Impl(int _s) : s(_s) {}
+ explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
~Impl()
{
- if (s >= 0) {
- Try<Nothing> close = os::close(s);
- if (close.isError()) {
- ABORT("Failed to close socket " +
- stringify(s) + ": " + close.error());
- }
+ CHECK(s >= 0);
+ Try<Nothing> close = os::close(s);
+ if (close.isError()) {
+ ABORT("Failed to close socket " +
+ stringify(s) + ": " + close.error());
}
}
int get() const
{
- return s >= 0 ? s : create().get();
+ return s;
}
Future<Nothing> connect(const Node& node);
@@ -152,52 +163,9 @@ public:
Future<Socket> accept();
private:
- const Impl& create() const
- {
- CHECK(s < 0);
-
- // Supported in Linux >= 2.6.27.
-#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
- Try<int> fd =
- network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
-
- if (fd.isError()) {
- ABORT("Failed to create socket: " + fd.error());
- }
-#else
- Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
- if (fd.isError()) {
- ABORT("Failed to create socket: " + fd.error());
- }
-
- Try<Nothing> nonblock = os::nonblock(fd.get());
- if (nonblock.isError()) {
- ABORT("Failed to create socket, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(fd.get());
- if (cloexec.isError()) {
- ABORT("Failed to create socket, cloexec: " + cloexec.error());
- }
-#endif
-
- s = fd.get();
- return *this;
- }
-
- // Mutable so that the socket can be lazily created.
- //
- // TODO(benh): Create a factory for sockets and don't lazily
- // create but instead return a Try<Socket> from the factory in
- // order to eliminate the need for a mutable member or the call to
- // ABORT above.
- mutable int s;
+ int s;
};
- Socket() : impl(std::make_shared<Impl>()) {}
-
- explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
-
bool operator == (const Socket& that) const
{
return impl == that.impl;
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5202750..09cd5a2 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -200,7 +200,11 @@ Future<Socket> accept(int fd)
"Failed to turn off the Nagle algorithm: " + stringify(error));
}
- return Socket(s);
+ Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+ if (socket.isError()) {
+ return Failure("Failed to accept, create socket: " + socket.error());
+ }
+ return socket.get();
}
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 72c0ad4..25981ca 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -306,7 +306,7 @@ private:
} links;
// Collection of all actice sockets.
- map<int, Socket> sockets;
+ map<int, Socket*> sockets;
// Collection of sockets that should be disposed when they are
// finished being used (e.g., when there is no more data to send on
@@ -452,7 +452,7 @@ static uint32_t __id__ = 0;
static const int LISTEN_BACKLOG = 500000;
// Local server socket.
-static Socket __s__;
+static Socket* __s__ = NULL;
// Local node.
static Node __node__;
@@ -721,7 +721,7 @@ void on_accept(const Future<Socket>& socket)
decoder));
}
- __s__.accept()
+ __s__->accept()
.onAny(lambda::bind(&on_accept, lambda::_1));
}
@@ -853,14 +853,19 @@ void initialize(const string& delegate)
}
// Create a "server" socket for communicating with other nodes.
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ PLOG(FATAL) << "Failed to construct server socket:" << create.error();
+ }
+ __s__ = new Socket(create.get());
// Allow address reuse.
int on = 1;
- if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ if (setsockopt(__s__->get(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- Try<Node> bind = __s__.bind(__node__);
+ Try<Node> bind = __s__->bind(__node__);
if (bind.isError()) {
PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
@@ -889,7 +894,7 @@ void initialize(const string& delegate)
__node__.ip = ip.get();
}
- Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+ Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
if (listen.isError()) {
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
@@ -898,7 +903,7 @@ void initialize(const string& delegate)
// 'spawn' below for the garbage collector.
initializing = false;
- __s__.accept()
+ __s__->accept()
.onAny(lambda::bind(&internal::on_accept, lambda::_1));
// TODO(benh): Make sure creating the garbage collector, logging
@@ -1232,7 +1237,7 @@ SocketManager::~SocketManager() {}
void SocketManager::accepted(const Socket& socket)
{
synchronized (this) {
- sockets[socket] = socket;
+ sockets[socket] = new Socket(socket);
}
}
@@ -1324,22 +1329,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
- Socket socket;
+ Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
- links.linkers[to].insert(process);
- links.linkees[process].insert(to);
- if (to.node != __node__) {
- links.remotes[to.node].insert(to);
- }
-
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
- int s = socket;
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ VLOG(1) << "Failed to link, create socket: " << create.error();
+ return;
+ }
+ socket = create.get();
+ int s = socket.get().get();
- sockets[s] = socket;
+ sockets[s] = new Socket(socket.get());
nodes[s] = to.node;
persists[to.node] = s;
@@ -1353,14 +1358,21 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
connect = true;
}
+
+ links.linkers[to].insert(process);
+ links.linkees[process].insert(to);
+ if (to.node != __node__) {
+ links.remotes[to.node].insert(to);
+ }
}
if (connect) {
- socket.connect(to.node)
+ CHECK_SOME(socket);
+ Socket(socket.get()).connect(to.node) // Copy to drop const.
.onAny(lambda::bind(
&internal::link_connect,
lambda::_1,
- new Socket(socket)));
+ new Socket(socket.get())));
}
}
@@ -1377,7 +1389,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
if (proxies.count(socket) > 0) {
return proxies[socket]->self();
} else {
- proxy = new HttpProxy(sockets[socket]);
+ proxy = new HttpProxy(*sockets[socket]);
proxies[socket] = proxy;
}
}
@@ -1569,7 +1581,7 @@ void SocketManager::send(Message* message)
const Node& node = message->to.node;
- Socket socket;
+ Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
@@ -1579,28 +1591,35 @@ void SocketManager::send(Message* message)
if (persist || temp) {
int s = persist ? persists[node] : temps[node];
CHECK(sockets.count(s) > 0);
- socket = sockets[s];
+ socket = *sockets[s];
// Update whether or not this socket should get disposed after
// there is no more data to send.
if (!persist) {
- dispose.insert(socket);
+ dispose.insert(socket.get());
}
- if (outgoing.count(socket) > 0) {
- outgoing[socket].push(new MessageEncoder(socket, message));
+ if (outgoing.count(socket.get()) > 0) {
+ outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
return;
} else {
// Initialize the outgoing queue.
- outgoing[socket];
+ outgoing[socket.get()];
}
} else {
// No peristent or temporary socket to the node currently
// exists, so we create a temporary one.
- int s = socket;
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ VLOG(1) << "Failed to send, create socket: " << create.error();
+ delete message;
+ return;
+ }
+ socket = create.get();
+ int s = socket.get();
- sockets[s] = socket;
+ sockets[s] = new Socket(socket.get());
nodes[s] = node;
temps[node] = s;
@@ -1614,16 +1633,19 @@ void SocketManager::send(Message* message)
}
if (connect) {
- socket.connect(node)
+ CHECK_SOME(socket);
+ Socket(socket.get()).connect(node) // Copy to drop const.
.onAny(lambda::bind(
&internal::send_connect,
lambda::_1,
- new Socket(socket),
+ new Socket(socket.get()),
message));
} else {
// If we're not connecting and we haven't added the encoder to
// the 'outgoing' queue then schedule it to be sent.
- internal::send(new MessageEncoder(socket, message), new Socket(socket));
+ internal::send(
+ new MessageEncoder(socket.get(), message),
+ new Socket(socket.get()));
}
}
@@ -1675,7 +1697,9 @@ Encoder* SocketManager::next(int s)
}
dispose.erase(s);
- sockets.erase(s);
+ auto iterator = sockets.find(s);
+ delete iterator->second;
+ sockets.erase(iterator);
// We don't actually close the socket (we wait for the Socket
// abstraction to close it once there are no more references),
@@ -1752,7 +1776,9 @@ void SocketManager::close(int s)
shutdown(s, SHUT_RD);
dispose.erase(s);
- sockets.erase(s);
+ auto iterator = sockets.find(s);
+ delete iterator->second;
+ sockets.erase(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
new file mode 100644
index 0000000..842a6ec
--- /dev/null
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -0,0 +1,56 @@
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+ // TODO(jmlvanre): Change the default based on configure or
+ // environment flags.
+ static const Kind DEFAULT = POLL;
+ return DEFAULT;
+}
+
+Try<Socket> Socket::create(Kind kind, int s)
+{
+ if (s < 0) {
+ // Supported in Linux >= 2.6.27.
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ Try<int> fd =
+ network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+
+ if (fd.isError()) {
+ return Error("Failed to create socket: " + fd.error());
+ }
+#else
+ Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd.isError()) {
+ return Error("Failed to create socket: " + fd.error());
+ }
+
+ Try<Nothing> nonblock = os::nonblock(fd.get());
+ if (nonblock.isError()) {
+ return Error("Failed to create socket, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(fd.get());
+ if (cloexec.isError()) {
+ return Error("Failed to create socket, cloexec: " + cloexec.error());
+ }
+#endif
+
+ s = fd.get();
+ }
+
+ switch (kind) {
+ case POLL: {
+ return Socket(std::make_shared<Socket::Impl>(s));
+ }
+ // By not setting a default we leverage the compiler errors when
+ // the enumeration is augmented to find all the cases we need to
+ // provide.
+ }
+}
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index ef52798..d65f5cf 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -19,7 +19,9 @@ using process::network::Socket;
TEST(Decoder, Request)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json?key1=value1&key2=value2#fragment HTTP/1.1\r\n"
@@ -55,7 +57,9 @@ TEST(Decoder, Request)
TEST(Decoder, RequestHeaderContinuation)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json HTTP/1.1\r\n"
@@ -79,7 +83,9 @@ TEST(Decoder, RequestHeaderContinuation)
// This is expected to fail for now, see my TODO(bmahler) on http::Request.
TEST(Decoder, DISABLED_RequestHeaderCaseInsensitive)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json HTTP/1.1\r\n"