You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/11/29 21:29:34 UTC
[01/14] mesos git commit: Changes in Mesos to make
http::Request::client optional.
Repository: mesos
Updated Branches:
refs/heads/master a85e28401 -> 913efc85d
Changes in Mesos to make http::Request::client optional.
Review: https://reviews.apache.org/r/54109
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d0805e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d0805e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d0805e9
Branch: refs/heads/master
Commit: 0d0805e914bc50717237e1246097af1d3b7ba92a
Parents: fcf9f20
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 5 00:08:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
src/master/http.cpp | 4 +++-
src/slave/http.cpp | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0d0805e9/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 90cbed1..ac560d1 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -389,7 +389,9 @@ void Master::Http::log(const Request& request)
Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
LOG(INFO) << "HTTP " << request.method << " for " << request.url.path
- << " from " << request.client
+ << (request.client.isSome()
+ ? " from " + stringify(request.client.get())
+ : "")
<< (userAgent.isSome()
? " with User-Agent='" + userAgent.get() + "'"
: "")
http://git-wip-us.apache.org/repos/asf/mesos/blob/0d0805e9/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 91bb882..87189dd 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -283,7 +283,9 @@ void Slave::Http::log(const Request& request)
Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
LOG(INFO) << "HTTP " << request.method << " for " << request.url.path
- << " from " << request.client
+ << (request.client.isSome()
+ ? " from " + stringify(request.client.get())
+ : "")
<< (userAgent.isSome()
? " with User-Agent='" + userAgent.get() + "'"
: "")
[03/14] mesos git commit: Updated Socket::Impl::accept to return
std::shared_ptr.
Posted by be...@apache.org.
Updated Socket::Impl::accept to return std::shared_ptr<Impl>.
Review: https://reviews.apache.org/r/53457
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c093c8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c093c8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c093c8c
Branch: refs/heads/master
Commit: 5c093c8c3b8e1f47cc501d85170272056010c616
Parents: a85e284
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Nov 1 22:41:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 8 ++++++--
3rdparty/libprocess/src/poll_socket.cpp | 21 ++++++++++++---------
3rdparty/libprocess/src/poll_socket.hpp | 2 +-
3 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index f798af7..97bdcb9 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -148,7 +148,7 @@ public:
* TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
* configurable by the caller of the interface.
*/
- virtual Future<Socket> accept() = 0;
+ virtual Future<std::shared_ptr<Impl>> accept() = 0;
virtual Future<Nothing> connect(const Address& address) = 0;
virtual Future<size_t> recv(char* data, size_t size) = 0;
@@ -296,7 +296,11 @@ public:
Future<Socket> accept()
{
- return impl->accept();
+ return impl->accept()
+ // TODO(benh): Use && for `impl` here!
+ .then([](const std::shared_ptr<Impl>& impl) {
+ return Socket(impl);
+ });
}
Future<Nothing> connect(const Address& address)
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index eb7b487..d04f048 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -50,7 +50,7 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
namespace internal {
-Future<Socket> accept(int fd)
+Future<int> accept(int fd)
{
Try<int> accepted = network::accept(fd);
if (accepted.isError()) {
@@ -91,21 +91,24 @@ Future<Socket> accept(int fd)
"Failed to turn off the Nagle algorithm: " + stringify(error));
}
- Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
- if (socket.isError()) {
- os::close(s);
- return Failure("Failed to accept, create socket: " + socket.error());
- }
- return socket.get();
+ return s;
}
} // namespace internal {
-Future<Socket> PollSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
{
return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()));
+ .then(lambda::bind(&internal::accept, get()))
+ .then([](int s) -> Future<std::shared_ptr<Socket::Impl>> {
+ Try<std::shared_ptr<Socket::Impl>> impl = create(s);
+ if (impl.isError()) {
+ os::close(s);
+ return Failure("Failed to create socket: " + impl.error());
+ }
+ return impl.get();
+ });
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index d04f3f2..3ba3678 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -30,7 +30,7 @@ public:
// Implementation of the Socket::Impl interface.
virtual Try<Nothing> listen(int backlog);
- virtual Future<Socket> accept();
+ virtual Future<std::shared_ptr<Socket::Impl>> accept();
virtual Future<Nothing> connect(const Address& address);
virtual Future<size_t> recv(char* data, size_t size);
virtual Future<size_t> send(const char* data, size_t size);
[14/14] mesos git commit: Updated http::Connection::disconnect to do
a complete socket shutdown.
Posted by be...@apache.org.
Updated http::Connection::disconnect to do a complete socket shutdown.
Review: https://reviews.apache.org/r/54114
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/913efc85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/913efc85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/913efc85
Branch: refs/heads/master
Commit: 913efc85da4632c56ee376bb403c1761bb4945ee
Parents: 4417a4e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 14:25:50 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/http.cpp | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/913efc85/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 03512c6..8a10dca 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1084,9 +1084,8 @@ public:
Future<Nothing> disconnect(const Option<string>& message = None())
{
- Try<Nothing> shutdown = socket.shutdown();
-
- disconnection.set(Nothing());
+ Try<Nothing> shutdown = socket.shutdown(
+ network::Socket::Shutdown::READ_WRITE);
// If a response is still streaming, we send EOF to
// the decoder in order to fail the pipe reader.
@@ -1101,6 +1100,8 @@ public:
pipeline.pop();
}
+ disconnection.set(Nothing());
+
return shutdown;
}
[02/14] mesos git commit: Changes in libprocess to make
http::Request::client optional.
Posted by be...@apache.org.
Changes in libprocess to make http::Request::client optional.
Review: https://reviews.apache.org/r/54108
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fcf9f208
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fcf9f208
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fcf9f208
Branch: refs/heads/master
Commit: fcf9f2082fc04323a74472eecfadaec325440af3
Parents: b64c9d4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 5 00:08:22 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 2 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fcf9f208/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index d464b9f..08a8390 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -447,7 +447,7 @@ struct Request
// For server requests, this contains the address of the client.
// Note that this may correspond to a proxy or load balancer address.
- network::inet::Address client;
+ Option<network::Address> client;
// Clients can choose to provide the entire body at once
// via BODY or can choose to stream the body over to the
http://git-wip-us.apache.org/repos/asf/mesos/blob/fcf9f208/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 22ec432..de22d20 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -549,7 +549,7 @@ TEST(HTTPTest, PathParse)
http::Response validateGetWithoutQuery(const http::Request& request)
{
- EXPECT_NE(process::address(), request.client);
+ EXPECT_SOME_NE(process::address(), request.client);
EXPECT_EQ("GET", request.method);
EXPECT_THAT(request.url.path, EndsWith("get"));
EXPECT_EQ("", request.body);
@@ -562,7 +562,7 @@ http::Response validateGetWithoutQuery(const http::Request& request)
http::Response validateGetWithQuery(const http::Request& request)
{
- EXPECT_NE(process::address(), request.client);
+ EXPECT_SOME_NE(process::address(), request.client);
EXPECT_EQ("GET", request.method);
EXPECT_THAT(request.url.path, EndsWith("get"));
EXPECT_EQ("", request.body);
[12/14] mesos git commit: Added POSIX socket shutdown types to
Windows header.
Posted by be...@apache.org.
Added POSIX socket shutdown types to Windows header.
This patch adds two missing BSD socket shutdown types
to a Windows header which maps these POSIX constants
to their Windows counterparts.
Review: https://reviews.apache.org/r/53990/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3013ffb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3013ffb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3013ffb
Branch: refs/heads/master
Commit: b3013ffb0711ad70f91b50e90ea17674137c9c15
Parents: 9a4b5e6
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Nov 29 12:18:14 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800
----------------------------------------------------------------------
3rdparty/stout/include/stout/windows.hpp | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3013ffb/3rdparty/stout/include/stout/windows.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/windows.hpp b/3rdparty/stout/include/stout/windows.hpp
index 3782aa0..c384026 100644
--- a/3rdparty/stout/include/stout/windows.hpp
+++ b/3rdparty/stout/include/stout/windows.hpp
@@ -171,6 +171,8 @@ typedef SSIZE_T ssize_t;
// the Windows versions of these flags to their POSIX equivalents so we don't
// have to change any socket code.
constexpr int SHUT_RD = SD_RECEIVE;
+constexpr int SHUT_WR = SD_SEND;
+constexpr int SHUT_RDWR = SD_BOTH;
constexpr int MSG_NOSIGNAL = 0; // `SIGPIPE` signal does not exist on Windows.
// The following functions are usually macros on POSIX; we provide them here as
[07/14] mesos git commit: Updated usage of network::Address and
network::Socket in Mesos.
Posted by be...@apache.org.
Updated usage of network::Address and network::Socket in Mesos.
Review: https://reviews.apache.org/r/53462
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b64c9d42
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b64c9d42
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b64c9d42
Branch: refs/heads/master
Commit: b64c9d42c114ee81e0e83d96206a8ae1a664464c
Parents: 2529d07
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:39:07 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
src/tests/fetcher_tests.cpp | 4 ++--
src/tests/utils.cpp | 21 ++++++++++-----------
2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b64c9d42/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 79526ae..8b8baf8 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -413,7 +413,7 @@ TEST_F(FetcherTest, OSNetUriTest)
{
Http http;
- const network::Address& address = http.process->self().address;
+ const network::inet::Address& address = http.process->self().address;
process::http::URL url(
"http",
@@ -458,7 +458,7 @@ TEST_F(FetcherTest, OSNetUriSpaceTest)
{
Http http;
- const network::Address& address = http.process->self().address;
+ const network::inet::Address& address = http.process->self().address;
process::http::URL url(
"http",
http://git-wip-us.apache.org/repos/asf/mesos/blob/b64c9d42/src/tests/utils.cpp
----------------------------------------------------------------------
diff --git a/src/tests/utils.cpp b/src/tests/utils.cpp
index d36aa9c..0a9e5a8 100644
--- a/src/tests/utils.cpp
+++ b/src/tests/utils.cpp
@@ -32,14 +32,13 @@
#include "tests/flags.hpp"
+namespace http = process::http;
+namespace inet = process::network::inet;
+
using std::string;
using process::Future;
using process::UPID;
-using process::address;
-
-namespace http = process::http;
-namespace network = process::network;
namespace mesos {
namespace internal {
@@ -53,7 +52,7 @@ const bool searchInstallationDirectory = false;
JSON::Object Metrics()
{
- UPID upid("metrics", address());
+ UPID upid("metrics", process::address());
// TODO(neilc): This request might timeout if the current value of a
// metric cannot be determined. In tests, a common cause for this is
@@ -73,20 +72,20 @@ JSON::Object Metrics()
Try<uint16_t> getFreePort()
{
// Bind to port=0 to obtain a random unused port.
- Try<network::Socket> socket = network::Socket::create();
+ Try<inet::Socket> socket = inet::Socket::create();
if (socket.isError()) {
return Error(socket.error());
}
- Try<network::Address> result = socket->bind(network::Address());
+ Try<inet::Address> address = socket->bind(inet::Address::ANY_ANY());
- if (result.isSome()) {
- return result->port;
- } else {
- return Error(result.error());
+ if (address.isError()) {
+ return Error(address.error());
}
+ return address->port;
+
// No explicit cleanup of `socket` as we rely on the implementation
// of `Socket` to close the socket on destruction.
}
[06/14] mesos git commit: Refactored network::Address into
inet::Address.
Posted by be...@apache.org.
Refactored network::Address into inet::Address.
Also added unix::Address.
Review: https://reviews.apache.org/r/53460
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/907ef806
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/907ef806
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/907ef806
Branch: refs/heads/master
Commit: 907ef80615a59183d3e78aba784df14595a482a8
Parents: 6a77817
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:36:41 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/process/address.hpp | 292 ++++++++++++++++---
3rdparty/libprocess/include/process/network.hpp | 28 +-
3rdparty/libprocess/include/process/pid.hpp | 2 +-
3rdparty/libprocess/include/process/socket.hpp | 99 ++++++-
.../libprocess/include/process/ssl/gtest.hpp | 4 +-
3rdparty/libprocess/src/http.cpp | 2 +-
3rdparty/libprocess/src/libevent_ssl_socket.cpp | 31 +-
3rdparty/libprocess/src/pid.cpp | 2 +-
3rdparty/libprocess/src/poll_socket.cpp | 34 ++-
3rdparty/libprocess/src/process.cpp | 6 +-
3rdparty/libprocess/src/socket.cpp | 17 +-
3rdparty/libprocess/src/tests/process_tests.cpp | 2 +-
3rdparty/libprocess/src/tests/socket_tests.cpp | 69 +++++
3rdparty/libprocess/src/tests/ssl_tests.cpp | 96 ++++--
3rdparty/libprocess/src/tests/test_linkee.cpp | 2 +-
16 files changed, 569 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 7131989..9d496b8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -232,6 +232,7 @@ libprocess_tests_SOURCES = \
src/tests/profiler_tests.cpp \
src/tests/queue_tests.cpp \
src/tests/reap_tests.cpp \
+ src/tests/socket_tests.cpp \
src/tests/sequence_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 5fd8ac4..01bc065 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -24,11 +24,16 @@
#include <glog/logging.h>
+#ifndef __WINDOWS__
+#include <sys/un.h>
+#endif // __WINDOWS__
+
#include <ostream>
#include <boost/functional/hash.hpp>
#include <stout/abort.hpp>
+#include <stout/check.hpp>
#include <stout/ip.hpp>
#include <stout/net.hpp>
#include <stout/stringify.hpp>
@@ -36,43 +41,126 @@
namespace process {
namespace network {
-// Represents a network "address", subsuming the struct addrinfo and
-// struct sockaddr* that typically is used to encapsulate IP and port.
+namespace inet {
+class Address;
+} // namespace inet {
+
+#ifndef __WINDOWS__
+namespace unix {
+class Address;
+} // namespace unix {
+#endif // __WINDOWS__
+
+// Represents a network "address", subsuming the `struct addrinfo` and
+// `struct sockaddr` that typically is used to encapsulate an address.
//
-// TODO(benh): Create a Family enumeration to replace sa_family_t.
// TODO(jieyu): Move this class to stout.
class Address
{
public:
- Address() : ip(INADDR_ANY), port(0) {}
+ enum class Family {
+ INET,
+#ifndef __WINDOWS__
+ UNIX
+#endif // __WINDOWS__
+ };
+
+ static Try<Address> create(const sockaddr_storage& storage)
+ {
+ switch (storage.ss_family) {
+ case AF_INET:
+#ifndef __WINDOWS__
+ case AF_UNIX:
+#endif // __WINDOWS__
+ return Address(storage);
+ default:
+ return Error("Unsupported family: " + stringify(storage.ss_family));
+ }
+ }
- Address(const net::IP& _ip, uint16_t _port) : ip(_ip), port(_port) {}
+ Family family() const
+ {
+ switch (sockaddr.storage.ss_family) {
+ case AF_INET:
+ return Family::INET;
+#ifndef __WINDOWS__
+ case AF_UNIX:
+ return Family::UNIX;
+#endif // __WINDOWS__
+ default:
+ ABORT("Unexpected family: " + stringify(sockaddr.storage.ss_family));
+ }
+ }
- static Address LOCALHOST_ANY()
+ // Returns the storage size depending on the family of this address.
+ size_t size() const
{
- return Address(net::IP(INADDR_ANY), 0);
+ switch (family()) {
+ case Family::INET:
+ return sizeof(sockaddr_in);
+#ifndef __WINDOWS__
+ case Family::UNIX:
+ return sizeof(sockaddr_un);
+#endif // __WINDOWS__
+ }
}
- static Try<Address> create(const struct sockaddr_storage& storage)
+ operator sockaddr_storage() const
{
- switch (storage.ss_family) {
- case AF_INET: {
- struct sockaddr_in addr = *(struct sockaddr_in*) &storage;
- return Address(net::IP(addr.sin_addr), ntohs(addr.sin_port));
- }
- default: {
- return Error(
- "Unsupported family type: " +
- stringify(storage.ss_family));
- }
- }
+ return sockaddr.storage;
}
- int family() const
+private:
+ friend class inet::Address;
+ #ifndef __WINDOWS__
+ friend class unix::Address;
+#endif // __WINDOWS__
+ template <typename AddressType>
+ friend Try<AddressType> convert(Try<Address>&& address);
+ friend std::ostream& operator<<(std::ostream& stream, const Address& address);
+
+ Address(const sockaddr_storage& storage)
{
- return ip.family();
+ sockaddr.storage = storage;
}
+ union {
+ sockaddr_storage storage;
+ sockaddr_in in;
+#ifndef __WINDOWS__
+ sockaddr_un un;
+#endif // __WINDOWS__
+ } sockaddr;
+};
+
+
+// Helper for converting between Address and other types.
+template <typename AddressType>
+Try<AddressType> convert(Try<Address>&& address);
+
+
+template <>
+inline Try<Address> convert(Try<Address>&& address)
+{
+ return address;
+}
+
+
+namespace inet {
+
+class Address
+{
+public:
+ Address(const net::IP& _ip, uint16_t _port)
+ : ip(_ip), port(_port) {}
+
+ Address(const sockaddr_in& in)
+ : Address(net::IP(in.sin_addr), ntohs(in.sin_port)) {}
+
+ static Address LOOPBACK_ANY() { return Address(net::IP(INADDR_LOOPBACK), 0); }
+
+ static Address ANY_ANY() { return Address(net::IP(INADDR_ANY), 0); }
+
/**
* Returns the hostname of this address's IP.
*
@@ -93,18 +181,6 @@ public:
return hostname.get();
}
- // Returns the storage size (i.e., either sizeof(sockaddr_in) or
- // sizeof(sockaddr_in6) depending on the family) of this address.
- size_t size() const
- {
- switch (family()) {
- case AF_INET:
- return sizeof(sockaddr_in);
- default:
- ABORT("Unsupported family type: " + stringify(family()));
- }
- }
-
bool operator<(const Address& that) const
{
if (ip == that.ip) {
@@ -133,6 +209,22 @@ public:
return !(*this == that);
}
+ bool operator==(const network::Address& that) const;
+
+ operator network::Address() const
+ {
+ union {
+ sockaddr_storage storage;
+ sockaddr_in in;
+ } sockaddr;
+ memset(&sockaddr.storage, 0, sizeof(sockaddr_storage));
+ sockaddr.in.sin_family = AF_INET;
+ sockaddr.in.sin_addr = ip.in().get();
+ sockaddr.in.sin_port = htons(port);
+ return network::Address(sockaddr.storage);
+ }
+
+ // TODO(benh): Use a sockaddr_in here like we do for unix::Address.
net::IP ip;
uint16_t port;
};
@@ -144,21 +236,149 @@ inline std::ostream& operator<<(std::ostream& stream, const Address& address)
return stream;
}
+} // namespace inet {
+
+
+template <>
+inline Try<inet::Address> convert(Try<Address>&& address)
+{
+ if (address.isError()) {
+ return Error(address.error());
+ }
+
+ if (address->family() == Address::Family::INET) {
+ return inet::Address(address->sockaddr.in);
+ }
+
+ return Error("Unexpected address family");
+}
+
+
namespace inet {
-using Address = network::Address;
+
+inline bool Address::operator==(const network::Address& that) const
+{
+ Try<Address> address = convert<Address>(that);
+ if (address.isError()) {
+ return false;
+ }
+ return *this == address.get();
+}
+
} // namespace inet {
+
+#ifndef __WINDOWS__
+namespace unix {
+
+class Address
+{
+public:
+ static Try<Address> create(const std::string& path)
+ {
+ sockaddr_un un;
+
+ const size_t PATH_LENGTH = sizeof(un.sun_path);
+
+ if (path.length() >= PATH_LENGTH) {
+ return Error("Path too long, must be less than " +
+ stringify(PATH_LENGTH) + " bytes");
+ }
+
+ un.sun_family = AF_UNIX;
+ memcpy(un.sun_path, path.c_str(), path.length() + 1);
+
+ return Address(un);
+ }
+
+ Address(const sockaddr_un& un)
+ : sockaddr() // Zero initialize.
+ {
+ sockaddr.un = un;
+ }
+
+ std::string path() const
+ {
+ if (sockaddr.un.sun_path[0] == '\0') {
+ return '\0' + std::string(sockaddr.un.sun_path + 1);
+ }
+
+ return std::string(sockaddr.un.sun_path);
+ }
+
+ operator network::Address() const
+ {
+ return network::Address(sockaddr.storage);
+ }
+
+private:
+ friend std::ostream& operator<<(
+ std::ostream& stream,
+ const Address& address);
+
+ union {
+ sockaddr_storage storage;
+ sockaddr_un un;
+ } sockaddr;
+};
+
+
+inline std::ostream& operator<<(
+ std::ostream& stream,
+ const Address& address)
+{
+ std::string path = address.path();
+ if (!path.empty() && path[0] == '\0') {
+ path[0] = '@';
+ }
+ return stream << path;
+}
+
+} // namespace unix {
+
+
+template <>
+inline Try<unix::Address> convert(Try<Address>&& address)
+{
+ if (address.isError()) {
+ return Error(address.error());
+ }
+
+ if (address->family() == Address::Family::UNIX) {
+ return unix::Address(address->sockaddr.un);
+ }
+
+ return Error("Unexpected address family");
+}
+#endif // __WINDOWS__
+
+
+inline std::ostream& operator<<(std::ostream& stream, const Address& address)
+{
+ switch (address.family()) {
+ case Address::Family::INET: {
+ stream << inet::Address(address.sockaddr.in);
+ }
+#ifndef __WINDOWS__
+ case Address::Family::UNIX: {
+ stream << unix::Address(address.sockaddr.un);
+ }
+#endif // __WINDOWS__
+ }
+ return stream;
+}
+
} // namespace network {
} // namespace process {
namespace std {
template <>
-struct hash<process::network::Address>
+struct hash<process::network::inet::Address>
{
typedef size_t result_type;
- typedef process::network::Address argument_type;
+ typedef process::network::inet::Address argument_type;
result_type operator()(const argument_type& address) const
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/network.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/network.hpp b/3rdparty/libprocess/include/process/network.hpp
index 5211066..8234765 100644
--- a/3rdparty/libprocess/include/process/network.hpp
+++ b/3rdparty/libprocess/include/process/network.hpp
@@ -30,10 +30,10 @@ using net::socket;
// TODO(benh): Remove and defer to Socket::accept.
inline Try<int> accept(int s)
{
- struct sockaddr_storage storage;
- socklen_t storagelen = sizeof(storage);
+ sockaddr_storage storage;
+ socklen_t length = sizeof(storage);
- int accepted = ::accept(s, (struct sockaddr*) &storage, &storagelen);
+ int accepted = ::accept(s, (sockaddr*) &storage, &length);
if (accepted < 0) {
return ErrnoError("Failed to accept");
}
@@ -45,10 +45,9 @@ inline Try<int> accept(int s)
// TODO(benh): Remove and defer to Socket::bind.
inline Try<Nothing> bind(int s, const Address& address)
{
- struct sockaddr_storage storage =
- net::createSockaddrStorage(address.ip, address.port);
+ sockaddr_storage storage = address;
- if (net::bind(s, (struct sockaddr*) &storage, address.size()) < 0) {
+ if (net::bind(s, (sockaddr*) &storage, address.size()) < 0) {
return ErrnoError("Failed to bind on " + stringify(address));
}
@@ -59,10 +58,9 @@ inline Try<Nothing> bind(int s, const Address& address)
// TODO(benh): Remove and defer to Socket::connect.
inline Try<Nothing, SocketError> connect(int s, const Address& address)
{
- struct sockaddr_storage storage =
- net::createSockaddrStorage(address.ip, address.port);
+ sockaddr_storage storage = address;
- if (net::connect(s, (struct sockaddr*) &storage, address.size()) < 0) {
+ if (net::connect(s, (sockaddr*) &storage, address.size()) < 0) {
return SocketError("Failed to connect to " + stringify(address));
}
@@ -78,10 +76,10 @@ inline Try<Nothing, SocketError> connect(int s, const Address& address)
*/
inline Try<Address> address(int s)
{
- struct sockaddr_storage storage;
- socklen_t storagelen = sizeof(storage);
+ sockaddr_storage storage;
+ socklen_t length = sizeof(storage);
- if (::getsockname(s, (struct sockaddr*) &storage, &storagelen) < 0) {
+ if (::getsockname(s, (sockaddr*) &storage, &length) < 0) {
return ErrnoError("Failed to getsockname");
}
@@ -97,10 +95,10 @@ inline Try<Address> address(int s)
*/
inline Try<Address> peer(int s)
{
- struct sockaddr_storage storage;
- socklen_t storagelen = sizeof(storage);
+ sockaddr_storage storage;
+ socklen_t length = sizeof(storage);
- if (::getpeername(s, (struct sockaddr*) &storage, &storagelen) < 0) {
+ if (::getpeername(s, (sockaddr*) &storage, &length) < 0) {
return ErrnoError("Failed to getpeername");
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index b274c55..c634916 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -93,7 +93,7 @@ struct UPID
}
std::string id;
- network::inet::Address address;
+ network::inet::Address address = network::inet::Address::ANY_ANY();
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 087a072..7f489e9 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -95,6 +95,7 @@ public:
// TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
// configurable by the caller of the interface.
static Try<std::shared_ptr<SocketImpl>> create(
+ Address::Family family,
Kind kind = DEFAULT_KIND());
virtual ~SocketImpl()
@@ -238,6 +239,10 @@ template <typename AddressType>
class Socket
{
public:
+ static_assert(
+ std::is_convertible<AddressType, network::Address>::value,
+ "Requires type convertible to `network::Address`");
+
/**
* Returns an instance of a `Socket` using the specified kind of
* implementation.
@@ -273,6 +278,24 @@ public:
static Try<Socket> create(SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
/**
+ * Returns an instance of a `Socket` using the specified
+ * `Address::Family` to determine the address family to use. An
+ * optional implementation kind can be specified. The NONBLOCK and
+ * CLOEXEC options will be set on the underlying file descriptor for
+ * the socket.
+ *
+ * NOTE: this is only defined for `AddressType` of
+ * `network::Address`, all others are explicitly deleted.
+ *
+ * @param kind Optional. The desired `Socket` implementation.
+ *
+ * @return An instance of a `Socket`.
+ */
+ static Try<Socket> create(
+ Address::Family family,
+ SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
+
+ /**
* Returns the kind representing the underlying implementation
* of the `Socket` instance.
*
@@ -295,12 +318,12 @@ public:
Try<AddressType> address() const
{
- return impl->address();
+ return convert<AddressType>(impl->address());
}
Try<AddressType> peer() const
{
- return impl->peer();
+ return convert<AddressType>(impl->peer());
}
int get() const
@@ -310,7 +333,7 @@ public:
Try<AddressType> bind(const AddressType& address)
{
- return impl->bind(address);
+ return convert<AddressType>(impl->bind(address));
}
Try<Nothing> listen(int backlog)
@@ -388,10 +411,76 @@ private:
using Socket = network::internal::Socket<network::Address>;
namespace inet {
-
using Socket = network::internal::Socket<inet::Address>;
-
} // namespace inet {
+
+#ifndef __WINDOWS__
+namespace unix {
+using Socket = network::internal::Socket<unix::Address>;
+} // namespace unix {
+#endif // __WINDOWS__
+
+
+namespace internal {
+
+template <>
+Try<Socket<network::Address>> Socket<network::Address>::create(
+ SocketImpl::Kind kind) = delete;
+
+
+template <>
+inline Try<Socket<network::Address>> Socket<network::Address>::create(
+ Address::Family family,
+ SocketImpl::Kind kind)
+{
+ Try<std::shared_ptr<SocketImpl>> impl = SocketImpl::create(family, kind);
+ if (impl.isError()) {
+ return Error(impl.error());
+ }
+ return Socket(impl.get());
+}
+
+
+template <>
+inline Try<Socket<inet::Address>> Socket<inet::Address>::create(
+ SocketImpl::Kind kind)
+{
+ Try<std::shared_ptr<SocketImpl>> impl =
+ SocketImpl::create(Address::Family::INET, kind);
+ if (impl.isError()) {
+ return Error(impl.error());
+ }
+ return Socket(impl.get());
+}
+
+
+template <>
+Try<Socket<inet::Address>> Socket<inet::Address>::create(
+ Address::Family family,
+ SocketImpl::Kind kind) = delete;
+
+
+#ifndef __WINDOWS__
+template <>
+inline Try<Socket<unix::Address>> Socket<unix::Address>::create(
+ SocketImpl::Kind kind)
+{
+ Try<std::shared_ptr<SocketImpl>> impl =
+ SocketImpl::create(Address::Family::UNIX, kind);
+ if (impl.isError()) {
+ return Error(impl.error());
+ }
+ return Socket(impl.get());
+}
+
+
+template <>
+Try<Socket<unix::Address>> Socket<unix::Address>::create(
+ Address::Family family,
+ SocketImpl::Kind kind) = delete;
+#endif // __WINDOWS__
+
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/ssl/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/ssl/gtest.hpp b/3rdparty/libprocess/include/process/ssl/gtest.hpp
index 8649da1..2a19b3b 100644
--- a/3rdparty/libprocess/include/process/ssl/gtest.hpp
+++ b/3rdparty/libprocess/include/process/ssl/gtest.hpp
@@ -302,10 +302,10 @@ protected:
process::network::inet::Socket server = create.get();
- // We need to explicitly bind to INADDR_LOOPBACK so the
+ // We need to explicitly bind to the loopback address so the
// certificate we create in this test fixture can be verified.
Try<process::network::inet::Address> bind =
- server.bind(process::network::inet::Address(net::IP(INADDR_LOOPBACK), 0));
+ server.bind(process::network::inet::Address::LOOPBACK_ANY());
if (bind.isError()) {
return Error(bind.error());
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 93a8a80..1f2de9d 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1315,7 +1315,7 @@ Future<Nothing> Connection::disconnected()
Future<Connection> connect(const URL& url)
{
// TODO(bmahler): Move address resolution into the URL class?
- Address address;
+ Address address = Address::ANY_ANY();
if (url.ip.isNone() && url.domain.isNone()) {
return Failure("Expected URL.ip or URL.domain to be set");
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index f9551bf..dddd0e2 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -494,21 +494,25 @@ Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
return Failure("Failed to connect: bufferevent_openssl_socket_new");
}
- // Try and determine the 'peer_hostname' from the address we're
- // connecting to in order to properly verify the certificate later.
- const Try<string> hostname = address.hostname();
+ if (address.family() == Address::Family::INET) {
+ // Try and determine the 'peer_hostname' from the address we're
+ // connecting to in order to properly verify the certificate
+ // later.
+ const Try<string> hostname =
+ network::convert<inet::Address>(address)->hostname();
+
+ if (hostname.isError()) {
+ VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
+ } else {
+ VLOG(2) << "Connecting to " << hostname.get();
+ peer_hostname = hostname.get();
+ }
- if (hostname.isError()) {
- VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
- } else {
- VLOG(2) << "Connecting to " << hostname.get();
- peer_hostname = hostname.get();
+ // Determine the 'peer_ip' from the address we're connecting to in
+ // order to properly verify the certificate later.
+ peer_ip = network::convert<inet::Address>(address)->ip;
}
- // Determine the 'peer_ip' from the address we're connecting to in
- // order to properly verify the certificate later.
- peer_ip = address.ip;
-
// Optimistically construct a 'ConnectRequest' and future.
Owned<ConnectRequest> request(new ConnectRequest());
Future<Nothing> future = request->promise.future();
@@ -533,8 +537,7 @@ Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
run_in_event_loop(
[self, address]() {
- sockaddr_storage addr =
- net::createSockaddrStorage(address.ip, address.port);
+ sockaddr_storage addr = address;
// Assign the callbacks for the bufferevent. We do this
// before the 'bufferevent_socket_connect()' call to avoid
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index ffb1021..023f881 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -101,7 +101,7 @@ istream& operator>>(istream& stream, UPID& pid)
string id;
string host;
- network::inet::Address address;
+ network::inet::Address address = network::inet::Address::ANY_ANY();
size_t index = str.find('@');
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 9184ea3..93ca37f 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -75,21 +75,31 @@ Future<int> accept(int fd)
return Failure("Failed to accept, cloexec: " + cloexec.error());
}
+ Try<Address> address = network::address(s);
+ if (address.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to get address: "
+ << address.error();
+ os::close(s);
+ return Failure("Failed to get address: " + address.error());
+ }
+
// Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
// NOTE: We cast to `char*` here because the function prototypes on Windows
// use `char*` instead of `void*`.
- int on = 1;
- if (::setsockopt(
- s,
- SOL_TCP,
- TCP_NODELAY,
- reinterpret_cast<const char*>(&on),
- sizeof(on)) < 0) {
- const string error = os::strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
+ if (address->family() == Address::Family::INET) {
+ int on = 1;
+ if (::setsockopt(
+ s,
+ SOL_TCP,
+ TCP_NODELAY,
+ reinterpret_cast<const char*>(&on),
+ sizeof(on)) < 0) {
+ const string error = os::strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
}
return s;
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a07d5e3..8204327 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -580,7 +580,7 @@ static std::mutex* socket_mutex = new std::mutex();
static Future<Socket> future_accept;
// Local socket address.
-static Address __address__;
+static Address __address__ = Address::ANY_ANY();
// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = nullptr;
@@ -1072,7 +1072,7 @@ bool initialize(
Clock::initialize(lambda::bind(&timedout, lambda::_1));
// Fill in the local IP and port for inter-libprocess communication.
- __address__ = Address::LOCALHOST_ANY();
+ __address__ = Address::ANY_ANY();
// Fetch and parse the libprocess environment variables.
internal::Flags flags;
@@ -1310,7 +1310,7 @@ void finalize()
// Clear the public address of the server socket.
// NOTE: This variable is necessary for process communication, so it
// cannot be cleared until after the `ProcessManager` is deleted.
- __address__ = Address::LOCALHOST_ANY();
+ __address__ = Address::ANY_ANY();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 0b7631d..2eaa5fc 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -47,18 +47,29 @@ Try<std::shared_ptr<SocketImpl>> SocketImpl::create(int s, Kind kind)
}
-Try<std::shared_ptr<SocketImpl>> SocketImpl::create(Kind kind)
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(
+ Address::Family family,
+ Kind kind)
{
+ int domain = [=]() {
+ switch (family) {
+ case Address::Family::INET: return AF_INET;
+#ifndef __WINDOWS__
+ case Address::Family::UNIX: return AF_UNIX;
+#endif // __WINDOWS__
+ }
+ }();
+
// Supported in Linux >= 2.6.27.
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
Try<int> s =
- network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+ network::socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (s.isError()) {
return Error("Failed to create socket: " + s.error());
}
#else
- Try<int> s = network::socket(AF_INET, SOCK_STREAM, 0);
+ Try<int> s = network::socket(domain, SOCK_STREAM, 0);
if (s.isError()) {
return Error("Failed to create socket: " + s.error());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 0424a10..cd159b6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1412,7 +1412,7 @@ TEST_TEMP_DISABLED_ON_WINDOWS(ProcessTest, Http2)
Socket socket = create.get();
- ASSERT_SOME(socket.bind(Address()));
+ ASSERT_SOME(socket.bind(Address::ANY_ANY()));
// Create a UPID for 'Libprocess-From' based on the IP and port we
// got assigned.
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/socket_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/socket_tests.cpp b/3rdparty/libprocess/src/tests/socket_tests.cpp
new file mode 100644
index 0000000..44c3c9a
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/socket_tests.cpp
@@ -0,0 +1,69 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <process/gtest.hpp>
+#include <process/future.hpp>
+#include <process/socket.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include <stout/tests/utils.hpp>
+
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
+
+using process::Future;
+
+using std::string;
+
+class SocketTest : public TemporaryDirectoryTest {};
+
+#ifndef __WINDOWS__
+TEST_F(SocketTest, Unix)
+{
+ Try<unix::Socket> server = unix::Socket::create();
+ ASSERT_SOME(server);
+
+ Try<unix::Socket> client = unix::Socket::create();
+ ASSERT_SOME(client);
+
+ // Use a path in the temporary directory so it gets cleaned up.
+ string path = path::join(sandbox.get(), "socket");
+
+ Try<unix::Address> address = unix::Address::create(path);
+ ASSERT_SOME(address);
+
+ ASSERT_SOME(server->bind(address.get()));
+ ASSERT_SOME(server->listen(1));
+
+ Future<unix::Socket> accept = server->accept();
+
+ AWAIT_READY(client->connect(address.get()));
+ AWAIT_READY(accept);
+
+ unix::Socket socket = accept.get();
+
+ const string data = "Hello World!";
+
+ AWAIT_READY(client->send(data));
+ AWAIT_EQ(data, socket.recv(data.size()));
+
+ AWAIT_READY(socket.send(data));
+ AWAIT_EQ(data, client->recv(data.size()));
+}
+#endif // __WINDOWS__
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/ssl_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_tests.cpp b/3rdparty/libprocess/src/tests/ssl_tests.cpp
index bdb9420..7fa46e4 100644
--- a/3rdparty/libprocess/src/tests/ssl_tests.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_tests.cpp
@@ -48,6 +48,9 @@ namespace http = process::http;
namespace io = process::io;
namespace network = process::network;
namespace openssl = network::openssl;
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
using network::inet::Address;
using network::inet::Socket;
@@ -132,50 +135,97 @@ TEST_P(SSLTest, BasicSameProcess)
openssl::reinitialize();
- const Try<Socket> server_create = Socket::create(SocketImpl::Kind::SSL);
- ASSERT_SOME(server_create);
+ Try<Socket> server = Socket::create(SocketImpl::Kind::SSL);
+ ASSERT_SOME(server);
- const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
- ASSERT_SOME(client_create);
+ Try<Socket> client = Socket::create(SocketImpl::Kind::SSL);
+ ASSERT_SOME(client);
- Socket server = server_create.get();
- Socket client = client_create.get();
+ // We need to explicitly bind to the loopback address so the
+ // certificate we create in this test fixture can be verified.
+ ASSERT_SOME(server->bind(Address::LOOPBACK_ANY()));
+ ASSERT_SOME(server->listen(BACKLOG));
- // We need to explicitly bind to INADDR_LOOPBACK so the certificate
- // we create in this test fixture can be verified.
- ASSERT_SOME(server.bind(Address(net::IP(INADDR_LOOPBACK), 0)));
+ Try<Address> address = server->address();
+ ASSERT_SOME(address);
- const Try<Nothing> listen = server.listen(BACKLOG);
- ASSERT_SOME(listen);
+ Future<Socket> accept = server->accept();
- const Try<Address> server_address = server.address();
- ASSERT_SOME(server_address);
+ AWAIT_ASSERT_READY(client->connect(address.get()));
- const Future<Socket> _socket = server.accept();
+ // Wait for the server to have accepted the client connection.
+ AWAIT_ASSERT_READY(accept);
- const Future<Nothing> connect = client.connect(server_address.get());
+ Socket socket = accept.get();
+
+ // Send a message from the client to the server.
+ const string data = "Hello World!";
+ AWAIT_ASSERT_READY(client->send(data));
+
+ // Verify the server received the message.
+ AWAIT_ASSERT_EQ(data, socket.recv(data.size()));
+
+ // Send the message back from the server to the client.
+ AWAIT_ASSERT_READY(socket.send(data));
+
+ // Verify the client received the message.
+ AWAIT_ASSERT_EQ(data, client->recv(data.size()));
+}
+
+
+#ifndef __WINDOWS__
+TEST_P(SSLTest, BasicSameProcessUnix)
+{
+ os::setenv("LIBPROCESS_SSL_ENABLED", "true");
+ os::setenv("LIBPROCESS_SSL_KEY_FILE", key_path().string());
+ os::setenv("LIBPROCESS_SSL_CERT_FILE", certificate_path().string());
+ // NOTE: we must set LIBPROCESS_SSL_REQUIRE_CERT to false because we
+ // don't have a hostname or IP to verify!
+ os::setenv("LIBPROCESS_SSL_REQUIRE_CERT", "false");
+ os::setenv("LIBPROCESS_SSL_CA_DIR", os::getcwd());
+ os::setenv("LIBPROCESS_SSL_CA_FILE", certificate_path().string());
+ os::setenv("LIBPROCESS_SSL_VERIFY_IPADD", GetParam());
+
+ openssl::reinitialize();
+
+ Try<unix::Socket> server = unix::Socket::create(SocketImpl::Kind::SSL);
+ ASSERT_SOME(server);
+
+ Try<unix::Socket> client = unix::Socket::create(SocketImpl::Kind::SSL);
+ ASSERT_SOME(client);
+
+ // Use a path in the temporary directory so it gets cleaned up.
+ string path = path::join(sandbox.get(), "socket");
+
+ Try<unix::Address> address = unix::Address::create(path);
+ ASSERT_SOME(address);
+
+ ASSERT_SOME(server->bind(address.get()));
+ ASSERT_SOME(server->listen(BACKLOG));
+
+ Future<unix::Socket> accept = server->accept();
+
+ AWAIT_ASSERT_READY(client->connect(address.get()));
// Wait for the server to have accepted the client connection.
- AWAIT_ASSERT_READY(_socket);
- Socket socket = _socket.get(); // TODO(jmlvanre): Remove const copy.
+ AWAIT_ASSERT_READY(accept);
- // Verify that the client also views the connection as established.
- AWAIT_ASSERT_READY(connect);
+ unix::Socket socket = accept.get();
// Send a message from the client to the server.
const string data = "Hello World!";
- AWAIT_ASSERT_READY(client.send(data));
+ AWAIT_ASSERT_READY(client->send(data));
// Verify the server received the message.
- AWAIT_ASSERT_EQ(data, socket.recv());
+ AWAIT_ASSERT_EQ(data, socket.recv(data.size()));
// Send the message back from the server to the client.
AWAIT_ASSERT_READY(socket.send(data));
// Verify the client received the message.
- AWAIT_ASSERT_EQ(data, client.recv());
+ AWAIT_ASSERT_EQ(data, client->recv(data.size()));
}
-
+#endif // __WINDOWS__
// Test a basic back-and-forth communication using the 'ssl-client'
// subprocess.
http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 99ea1eb..921d676 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -120,7 +120,7 @@ int main(int argc, char** argv)
}
// Bind to some random port.
- Try<Address> bind = __s__->bind(Address::LOCALHOST_ANY());
+ Try<Address> bind = __s__->bind(Address::ANY_ANY());
if (bind.isError()) {
EXIT(EXIT_FAILURE) << "Failed to bind: " << bind.error();
}
[09/14] mesos git commit: Inlined function only used one place.
Posted by be...@apache.org.
Inlined function only used one place.
Review: https://reviews.apache.org/r/53461
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2529d077
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2529d077
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2529d077
Branch: refs/heads/master
Commit: 2529d077a6a1603f7689a684a1e946f16e474006
Parents: 4694267
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:37:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/stout/include/stout/net.hpp | 48 ++++++++++++++-----------------
1 file changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2529d077/3rdparty/stout/include/stout/net.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/net.hpp b/3rdparty/stout/include/stout/net.hpp
index 083b8d1..4803aeb 100644
--- a/3rdparty/stout/include/stout/net.hpp
+++ b/3rdparty/stout/include/stout/net.hpp
@@ -198,32 +198,6 @@ inline struct addrinfo createAddrInfo(int socktype, int family, int flags)
}
-// TODO(evelinad): Move this to Address.
-inline struct sockaddr_storage createSockaddrStorage(const IP& ip, int port)
-{
- struct sockaddr_storage storage;
- memset(&storage, 0, sizeof(storage));
-
- switch (ip.family()) {
- case AF_INET: {
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr = ip.in().get();
- addr.sin_port = htons(port);
-
- memcpy(&storage, &addr, sizeof(addr));
- break;
- }
- default: {
- ABORT("Unsupported family type: " + stringify(ip.family()));
- }
- }
-
- return storage;
-}
-
-
inline Try<std::string> hostname()
{
char host[512];
@@ -252,9 +226,29 @@ inline Try<std::string> hostname()
// Returns a Try of the hostname for the provided IP. If the hostname
// cannot be resolved, then a string version of the IP address is
// returned.
+//
+// TODO(benh): Merge with `net::hostname`.
inline Try<std::string> getHostname(const IP& ip)
{
- struct sockaddr_storage storage = createSockaddrStorage(ip, 0);
+ struct sockaddr_storage storage;
+ memset(&storage, 0, sizeof(storage));
+
+ switch (ip.family()) {
+ case AF_INET: {
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr = ip.in().get();
+ addr.sin_port = 0;
+
+ memcpy(&storage, &addr, sizeof(addr));
+ break;
+ }
+ default: {
+ ABORT("Unsupported family type: " + stringify(ip.family()));
+ }
+ }
+
char hostname[MAXHOSTNAMELEN];
int error = getnameinfo(
[11/14] mesos git commit: Added support for http::connect to take an
network::Address.
Posted by be...@apache.org.
Added support for http::connect to take an network::Address.
Review: https://reviews.apache.org/r/54112
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a4b5e62
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a4b5e62
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a4b5e62
Branch: refs/heads/master
Commit: 9a4b5e62486e9b1b0ee8913f8b1ae83b165ad333
Parents: 448b860
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 13:58:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:15:20 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 10 +++++-
3rdparty/libprocess/src/http.cpp | 37 +++++++++++++++++------
2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a4b5e62/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 08a8390..7894b31 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -849,7 +849,8 @@ public:
bool operator!=(const Connection& c) const { return !(*this == c); }
private:
- Connection(const network::inet::Socket& s);
+ Connection(const network::Socket& s);
+ friend Future<Connection> connect(const network::Address& address);
friend Future<Connection> connect(const URL&);
// Forward declaration.
@@ -859,6 +860,13 @@ private:
};
+// TODO(benh): Currently we don't support SSL for this version of
+// connect. We should support this, perhaps with an enum or a bool and
+// then update the `connect(URL)` variant to just call this function
+// instead.
+Future<Connection> connect(const network::Address& address);
+
+
Future<Connection> connect(const URL& url);
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a4b5e62/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1f2de9d..03512c6 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1024,7 +1024,7 @@ Future<Response> convert(const Response& pipeResponse)
class ConnectionProcess : public Process<ConnectionProcess>
{
public:
- ConnectionProcess(const Socket& _socket)
+ ConnectionProcess(const network::Socket& _socket)
: ProcessBase(ID::generate("__http_connection__")),
socket(_socket),
sendChain(Nothing()),
@@ -1061,7 +1061,7 @@ public:
// We must chain the calls to Socket::send as it
// otherwise interleaves data across calls.
- Socket socket_ = socket;
+ network::Socket socket_ = socket;
sendChain = sendChain
.then([socket_, request]() {
@@ -1124,7 +1124,7 @@ protected:
}
private:
- static Future<Nothing> _send(Socket socket, Pipe::Reader reader)
+ static Future<Nothing> _send(network::Socket socket, Pipe::Reader reader)
{
return reader.read()
.then([socket, reader](const string& data) mutable -> Future<Nothing> {
@@ -1233,7 +1233,7 @@ private:
read();
}
- Socket socket;
+ network::Socket socket;
StreamingResponseDecoder decoder;
Future<Nothing> sendChain;
Promise<Nothing> disconnection;
@@ -1262,7 +1262,7 @@ struct Connection::Data
// on within a different execution context. More generally,
// we should be passing Process ownership to libprocess to
// ensure all interaction with a Process occurs through a PID.
- Data(const Socket& s)
+ Data(const network::Socket& s)
: process(spawn(new internal::ConnectionProcess(s), true)) {}
~Data()
@@ -1279,7 +1279,7 @@ struct Connection::Data
};
-Connection::Connection(const Socket& s)
+Connection::Connection(const network::Socket& s)
: data(std::make_shared<Connection::Data>(s)) {}
@@ -1312,6 +1312,20 @@ Future<Nothing> Connection::disconnected()
}
+Future<Connection> connect(const network::Address& address)
+{
+ Try<network::Socket> socket = network::Socket::create(address.family());
+ if (socket.isError()) {
+ return Failure("Failed to create socket: " + socket.error());
+ }
+
+ return socket->connect(address)
+ .then([socket]() {
+ return Connection(socket.get());
+ });
+}
+
+
Future<Connection> connect(const URL& url)
{
// TODO(bmahler): Move address resolution into the URL class?
@@ -1340,15 +1354,20 @@ Future<Connection> connect(const URL& url)
address.port = url.port.get();
- Try<Socket> socket = [&url]() -> Try<Socket> {
+ // TODO(benh): Reuse `connect(address)` once it supports SSL.
+ Try<network::Socket> socket = [&url]() -> Try<network::Socket> {
// Default to 'http' if no scheme was specified.
if (url.scheme.isNone() || url.scheme == string("http")) {
- return Socket::create(SocketImpl::Kind::POLL);
+ return network::Socket::create(
+ network::Address::Family::INET,
+ SocketImpl::Kind::POLL);
}
if (url.scheme == string("https")) {
#ifdef USE_SSL_SOCKET
- return Socket::create(SocketImpl::Kind::SSL);
+ return network::Socket::create(
+ network::Address::Family::INET,
+ SocketImpl::Kind::SSL);
#else
return Error("'https' scheme requires SSL enabled");
#endif
[10/14] mesos git commit: Removed unused Socket from Encoder.
Posted by be...@apache.org.
Removed unused Socket from Encoder.
Review: https://reviews.apache.org/r/54111
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/448b860b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/448b860b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/448b860b
Branch: refs/heads/master
Commit: 448b860b295267fc249e7a640bd275f474b2ff23
Parents: 0d0805e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 12:04:32 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:10:39 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 26 +++++---------
3rdparty/libprocess/src/process.cpp | 37 ++++++++++----------
3rdparty/libprocess/src/tests/decoder_tests.cpp | 1 -
3 files changed, 28 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 9667a62..1447d6f 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -46,7 +46,8 @@ public:
FILE
};
- explicit Encoder(const network::inet::Socket& _s) : s(_s) {}
+ Encoder() = default;
+
virtual ~Encoder() {}
virtual Kind kind() const = 0;
@@ -54,22 +55,14 @@ public:
virtual void backup(size_t length) = 0;
virtual size_t remaining() const = 0;
-
- network::inet::Socket socket() const
- {
- return s;
- }
-
-private:
- const network::inet::Socket s; // The socket this encoder is associated with.
};
class DataEncoder : public Encoder
{
public:
- DataEncoder(const network::inet::Socket& s, const std::string& _data)
- : Encoder(s), data(_data), index(0) {}
+ DataEncoder(const std::string& _data)
+ : data(_data), index(0) {}
virtual ~DataEncoder() {}
@@ -107,8 +100,8 @@ private:
class MessageEncoder : public DataEncoder
{
public:
- MessageEncoder(const network::inet::Socket& s, Message* _message)
- : DataEncoder(s, encode(_message)), message(_message) {}
+ MessageEncoder(Message* _message)
+ : DataEncoder(encode(_message)), message(_message) {}
virtual ~MessageEncoder()
{
@@ -162,10 +155,9 @@ class HttpResponseEncoder : public DataEncoder
{
public:
HttpResponseEncoder(
- const network::inet::Socket& s,
const http::Response& response,
const http::Request& request)
- : DataEncoder(s, encode(response, request)) {}
+ : DataEncoder(encode(response, request)) {}
static std::string encode(
const http::Response& response,
@@ -251,8 +243,8 @@ public:
class FileEncoder : public Encoder
{
public:
- FileEncoder(const network::inet::Socket& s, int _fd, size_t _size)
- : Encoder(s), fd(_fd), size(_size), index(0) {}
+ FileEncoder(int _fd, size_t _size)
+ : fd(_fd), size(_size), index(0) {}
virtual ~FileEncoder()
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 200cefc..889a034 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -386,7 +386,7 @@ public:
// This generally happens when `process::finalize` is called.
void unproxy(const Socket& socket);
- void send(Encoder* encoder, bool persist);
+ void send(Encoder* encoder, bool persist, const Socket& socket);
void send(const Response& response,
const Request& request,
const Socket& socket);
@@ -1492,13 +1492,15 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
// TODO(benh): Consider a way to have the socket manager turn
// on TCP_CORK for both sends and then turn it off.
socket_manager->send(
- new HttpResponseEncoder(socket, response, request),
- true);
+ new HttpResponseEncoder(response, request),
+ true,
+ socket);
// Note the file descriptor gets closed by FileEncoder.
socket_manager->send(
- new FileEncoder(socket, fd, s.st_size),
- request.keepAlive);
+ new FileEncoder(fd, s.st_size),
+ request.keepAlive,
+ socket);
}
}
} else if (response.type == Response::PIPE) {
@@ -1513,8 +1515,9 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
VLOG(3) << "Starting \"chunked\" streaming";
socket_manager->send(
- new HttpResponseEncoder(socket, response, request),
- true);
+ new HttpResponseEncoder(response, request),
+ true,
+ socket);
CHECK_SOME(response.reader);
http::Pipe::Reader reader = response.reader.get();
@@ -1569,8 +1572,9 @@ void HttpProxy::stream(
// Always persist the connection when streaming is not finished.
socket_manager->send(
- new DataEncoder(socket, out.str()),
- finished ? request->keepAlive : true);
+ new DataEncoder(out.str()),
+ finished ? request->keepAlive : true,
+ socket);
} else if (chunk.isFailed()) {
VLOG(1) << "Failed to read from stream: " << chunk.failure();
// TODO(bmahler): Have to close connection if headers were sent!
@@ -2033,12 +2037,11 @@ void _send(
} // namespace internal {
-void SocketManager::send(Encoder* encoder, bool persist)
+void SocketManager::send(Encoder* encoder, bool persist, const Socket& socket)
{
CHECK(encoder != nullptr);
synchronized (mutex) {
- Socket socket = encoder->socket();
if (sockets.count(socket) > 0) {
// Update whether or not this socket should get disposed after
// there is no more data to send.
@@ -2061,7 +2064,7 @@ void SocketManager::send(Encoder* encoder, bool persist)
}
if (encoder != nullptr) {
- internal::send(encoder, encoder->socket());
+ internal::send(encoder, socket);
}
}
@@ -2081,7 +2084,7 @@ void SocketManager::send(
}
}
- send(new HttpResponseEncoder(socket, response, request), persist);
+ send(new HttpResponseEncoder(response, request), persist, socket);
}
@@ -2150,7 +2153,7 @@ void SocketManager::send_connect(
return;
}
- Encoder* encoder = new MessageEncoder(socket, message);
+ Encoder* encoder = new MessageEncoder(message);
// Receive and ignore data from this socket. Note that we don't
// expect to receive anything other than HTTP '202 Accepted'
@@ -2195,7 +2198,7 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
}
if (outgoing.count(socket.get()) > 0) {
- outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
+ outgoing[socket.get()].push(new MessageEncoder(message));
return;
} else {
// Initialize the outgoing queue.
@@ -2244,9 +2247,7 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
} else {
// If we're not connecting and we haven't added the encoder to
// the 'outgoing' queue then schedule it to be sent.
- internal::send(
- new MessageEncoder(socket.get(), message),
- socket.get());
+ internal::send(new MessageEncoder(message), socket.get());
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index 7356716..87563f4 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -17,7 +17,6 @@
#include <process/gtest.hpp>
#include <process/owned.hpp>
-#include <process/socket.hpp>
#include <stout/gtest.hpp>
[13/14] mesos git commit: Added support for specifying how a socket
should be shutdown.
Posted by be...@apache.org.
Added support for specifying how a socket should be shutdown.
Review: https://reviews.apache.org/r/54113
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4417a4e8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4417a4e8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4417a4e8
Branch: refs/heads/master
Commit: 4417a4e8917e17e70942a77bd796857978778888
Parents: b3013ff
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 12:10:14 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 27 ++++++++++++++++-----
1 file changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4417a4e8/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7f489e9..e9e89ee 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -183,11 +183,9 @@ public:
* Shutdown the receive-side of the socket. No further data can be
* received from the socket.
*/
- // TODO(neilc): Change this to allow the caller to specify `how`.
- // See MESOS-5658.
- virtual Try<Nothing> shutdown()
+ virtual Try<Nothing> shutdown(int how)
{
- if (::shutdown(s, SHUT_RD) < 0) {
+ if (::shutdown(s, how) < 0) {
return ErrnoError();
}
@@ -380,9 +378,26 @@ public:
return impl->send(data);
}
- Try<Nothing> shutdown()
+ enum class Shutdown
{
- return impl->shutdown();
+ READ,
+ WRITE,
+ READ_WRITE
+ };
+
+ // TODO(benh): Replace the default to Shutdown::READ_WRITE or remove
+ // all together since it's unclear what the defauilt should be.
+ Try<Nothing> shutdown(Shutdown shutdown = Shutdown::READ)
+ {
+ int how = [&]() {
+ switch (shutdown) {
+ case Shutdown::READ: return SHUT_RD;
+ case Shutdown::WRITE: return SHUT_WR;
+ case Shutdown::READ_WRITE: return SHUT_RDWR;
+ }
+ }();
+
+ return impl->shutdown(how);
}
// Support implicit conversion of any `Socket<AddressType>` to a
[04/14] mesos git commit: Refactored `Socket` to support a templated
version.
Posted by be...@apache.org.
Refactored `Socket` to support a templated version.
The templated `Socket` will enable us to provide type safe
functionality for different addresses, ultimately `inet::Address` and
`unix::Address` (and likely eventually `inet6::Address` as well).
Also introduced the `inet` namespace and updated the code to
explicitly use `inet::Address` and `inet::Socket`.
Review: https://reviews.apache.org/r/53459
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a77817e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a77817e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a77817e
Branch: refs/heads/master
Commit: 6a77817e5ea62e7a779a350c25f351b8915c6385
Parents: 7aa6849
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 23:08:55 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/address.hpp | 4 +
.../libprocess/include/process/firewall.hpp | 4 +-
3rdparty/libprocess/include/process/http.hpp | 10 +-
3rdparty/libprocess/include/process/pid.hpp | 7 +-
3rdparty/libprocess/include/process/process.hpp | 2 +-
3rdparty/libprocess/include/process/socket.hpp | 400 +++++++++++--------
.../libprocess/include/process/ssl/gtest.hpp | 17 +-
3rdparty/libprocess/src/encoder.hpp | 14 +-
3rdparty/libprocess/src/http.cpp | 10 +-
3rdparty/libprocess/src/libevent_ssl_socket.cpp | 22 +-
3rdparty/libprocess/src/libevent_ssl_socket.hpp | 16 +-
3rdparty/libprocess/src/pid.cpp | 2 +-
3rdparty/libprocess/src/poll_socket.cpp | 10 +-
3rdparty/libprocess/src/poll_socket.hpp | 15 +-
3rdparty/libprocess/src/process.cpp | 22 +-
3rdparty/libprocess/src/socket.cpp | 117 +++---
3rdparty/libprocess/src/tests/decoder_tests.cpp | 6 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 2 +-
3rdparty/libprocess/src/tests/process_tests.cpp | 4 +-
3rdparty/libprocess/src/tests/ssl_client.cpp | 10 +-
3rdparty/libprocess/src/tests/ssl_tests.cpp | 16 +-
3rdparty/libprocess/src/tests/test_linkee.cpp | 4 +-
22 files changed, 391 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 04e3155..5fd8ac4 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -144,6 +144,10 @@ inline std::ostream& operator<<(std::ostream& stream, const Address& address)
return stream;
}
+namespace inet {
+using Address = network::Address;
+} // namespace inet {
+
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/firewall.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/firewall.hpp b/3rdparty/libprocess/include/process/firewall.hpp
index ad461ca..0a7b985 100644
--- a/3rdparty/libprocess/include/process/firewall.hpp
+++ b/3rdparty/libprocess/include/process/firewall.hpp
@@ -55,7 +55,7 @@ public:
* for failure. Otherwise an unset 'Option' object.
*/
virtual Option<http::Response> apply(
- const network::Socket& socket,
+ const network::inet::Socket& socket,
const http::Request& request) = 0;
};
@@ -75,7 +75,7 @@ public:
virtual ~DisabledEndpointsFirewallRule() {}
virtual Option<http::Response> apply(
- const network::Socket&,
+ const network::inet::Socket&,
const http::Request& request)
{
if (paths.contains(request.url.path)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a684e09..d464b9f 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -30,6 +30,7 @@
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <process/socket.hpp>
#include <stout/error.hpp>
#include <stout/hashmap.hpp>
@@ -49,12 +50,7 @@ namespace process {
template <typename T>
class Future;
-namespace network {
-class Socket;
-} // namespace network {
-
namespace http {
-
namespace authentication {
class Authenticator;
@@ -451,7 +447,7 @@ struct Request
// For server requests, this contains the address of the client.
// Note that this may correspond to a proxy or load balancer address.
- network::Address client;
+ network::inet::Address client;
// Clients can choose to provide the entire body at once
// via BODY or can choose to stream the body over to the
@@ -853,7 +849,7 @@ public:
bool operator!=(const Connection& c) const { return !(*this == c); }
private:
- Connection(const network::Socket& s);
+ Connection(const network::inet::Socket& s);
friend Future<Connection> connect(const URL&);
// Forward declaration.
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 36453b6..b274c55 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -46,13 +46,13 @@ struct UPID
UPID(const char* id_, const net::IP& ip_, uint16_t port_)
: id(id_), address(ip_, port_) {}
- UPID(const char* id_, const network::Address& address_)
+ UPID(const char* id_, const network::inet::Address& address_)
: id(id_), address(address_) {}
UPID(const std::string& id_, const net::IP& ip_, uint16_t port_)
: id(id_), address(ip_, port_) {}
- UPID(const std::string& id_, const network::Address& address_)
+ UPID(const std::string& id_, const network::inet::Address& address_)
: id(id_), address(address_) {}
/*implicit*/ UPID(const char* s);
@@ -91,8 +91,9 @@ struct UPID
{
return !(*this == that);
}
+
std::string id;
- network::Address address;
+ network::inet::Address address;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index de23f0c..1b066b2 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -539,7 +539,7 @@ std::string absolutePath(const std::string& path);
/**
* Returns the socket address associated with this instance of the library.
*/
-network::Address address();
+network::inet::Address address();
/**
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index a70954a..087a072 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -32,24 +32,32 @@
namespace process {
namespace network {
+namespace internal {
/**
- * An abstraction around a socket (file descriptor).
+ * Implementation interface for a `Socket`.
*
- * Provides reference counting such that the socket is only closed
- * (and thus, has the possiblity of being reused) after there are no
- * more references.
+ * Each socket is:
+ * - reference counted,
+ * - shared by default,
+ * - and a concurrent object.
+ *
+ * Multiple implementations are supported via the Pimpl pattern,
+ * rather than forcing each Socket implementation to do this themselves.
+ *
+ * @see process::network::Socket
+ * @see [Pimpl pattern](https://en.wikipedia.org/wiki/Opaque_pointer)
*/
-class Socket
+class SocketImpl : public std::enable_shared_from_this<SocketImpl>
{
public:
/**
* Available kinds of implementations.
*
- * @see process::network::PollSocketImpl
- * @see process::network::LibeventSSLSocketImpl
+ * @see process::network::internal::PollSocketImpl
+ * @see process::network::internal::LibeventSSLSocketImpl
*/
- enum Kind
+ enum class Kind
{
POLL,
#ifdef USE_SSL_SOCKET
@@ -58,180 +66,222 @@ public:
};
/**
- * Returns an instance of a `Socket` using the specified kind of
- * implementation. All implementations will set the NONBLOCK and
- * CLOEXEC options on the returned socket.
+ * Returns the default `Kind` of implementation.
+ */
+ static Kind DEFAULT_KIND();
+
+ /**
+ * Returns an instance of a `SocketImpl` using the specified kind of
+ * implementation.
*
- * @param kind Optional. The desired `Socket` implementation.
- * @param s Optional. The file descriptor to wrap with the `Socket`.
+ * @param s. The existing file descriptor to use.
+ * @param kind Optional. The desired implementation.
*
- * @return An instance of a `Socket`.
+ * @return An instance of a `SocketImpl`.
+ */
+ static Try<std::shared_ptr<SocketImpl>> create(
+ int s,
+ Kind kind = DEFAULT_KIND());
+
+ /**
+ * Returns an instance of a `SocketImpl` using the specified kind of
+ * implementation. The NONBLOCK and CLOEXEC options will be set on
+ * the underlying file descriptor for the socket.
*
- * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
- * configurable by the caller of the interface.
+ * @param kind Optional. The desired implementation.
+ *
+ * @return An instance of a `SocketImpl`.
*/
- static Try<Socket> create(Kind kind = DEFAULT_KIND(), Option<int> s = None());
+ // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+ // configurable by the caller of the interface.
+ static Try<std::shared_ptr<SocketImpl>> create(
+ Kind kind = DEFAULT_KIND());
+
+ virtual ~SocketImpl()
+ {
+ // Don't close if the socket was released.
+ if (s >= 0) {
+ CHECK_SOME(os::close(s)) << "Failed to close socket";
+ }
+ }
/**
- * Returns the default `Kind` of implementation of `Socket`.
+ * Returns the file descriptor wrapped by this implementation.
+ */
+ int get() const
+ {
+ return s;
+ }
+
+ /**
+ * @copydoc network::address
+ */
+ Try<Address> address() const;
+
+ /**
+ * @copydoc network::peer
+ */
+ Try<Address> peer() const;
+
+ /**
+ * Assigns the specified address to the socket.
*
- * @see process::network::Socket::Kind
+ * @return The assigned `Address` or an error if the bind system
+ * call fails.
*/
- static Kind DEFAULT_KIND();
+ Try<Address> bind(const Address& address);
+
+ virtual Try<Nothing> listen(int backlog) = 0;
/**
- * Returns the kind representing the underlying implementation
- * of the `Socket` instance.
+ * Returns an implementation corresponding to the next pending
+ * connection for the listening socket. All implementations will set
+ * the NONBLOCK and CLOEXEC options on the returned socket.
*
- * @see process::network::Socket::Kind
+ * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+ * configurable by the caller of the interface.
*/
- Kind kind() const { return impl->kind(); }
+ virtual Future<std::shared_ptr<SocketImpl>> accept() = 0;
+
+ virtual Future<Nothing> connect(const Address& address) = 0;
+ virtual Future<size_t> recv(char* data, size_t size) = 0;
+ virtual Future<size_t> send(const char* data, size_t size) = 0;
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
/**
- * Interface for a `Socket`.
+ * An overload of `recv`, which receives data based on the specified
+ * 'size' parameter.
*
- * Each socket is:
- * - reference counted,
- * - shared by default,
- * - and a concurrent object.
+ * @param size
+ * Value | Semantics
+ * :-------:|-----------
+ * 0 | Returns an empty string.
+ * -1 | Receives until EOF.
+ * N | Returns a string of size N.
+ * 'None' | Returns a string of the available data.
+ * If 'None' is specified, whenever data becomes available on the
+ * socket, that much data will be returned.
+ */
+ // TODO(benh): Consider returning Owned<std::string> or
+ // Shared<std::string>, the latter enabling reuse of a pool of
+ // preallocated strings/buffers.
+ virtual Future<std::string> recv(const Option<ssize_t>& size = None());
+
+ /**
+ * An overload of `send`, which sends all of the specified data.
*
- * Multiple implementations are supported via the Pimpl pattern,
- * rather than forcing each Socket implementation to do this themselves.
+ * @param data The specified data to send.
*
- * @see process::network::Socket
- * @see [Pimpl pattern](https://en.wikipedia.org/wiki/Opaque_pointer)
+ * @return Nothing or an error in case the sending fails.
+ */
+ // TODO(benh): Consider taking Shared<std::string>, the latter
+ // enabling reuse of a pool of preallocated strings/buffers.
+ virtual Future<Nothing> send(const std::string& data);
+
+ /**
+ * Shutdown the receive-side of the socket. No further data can be
+ * received from the socket.
*/
- class Impl : public std::enable_shared_from_this<Impl>
+ // TODO(neilc): Change this to allow the caller to specify `how`.
+ // See MESOS-5658.
+ virtual Try<Nothing> shutdown()
{
- public:
- virtual ~Impl()
- {
- // Don't close if the socket was released.
- if (s >= 0) {
- CHECK_SOME(os::close(s)) << "Failed to close socket";
- }
+ if (::shutdown(s, SHUT_RD) < 0) {
+ return ErrnoError();
}
- /**
- * Returns the file descriptor wrapped by the `Socket`.
- */
- int get() const
- {
- return s;
- }
+ return Nothing();
+ }
- /**
- * @copydoc network::address
- */
- Try<Address> address() const;
-
- /**
- * @copydoc network::peer
- */
- Try<Address> peer() const;
-
- /**
- * Assigns the specified address to the `Socket`.
- *
- * @return The assigned `Address` or an error if the bind system
- * call fails.
- */
- Try<Address> bind(const Address& address);
-
- virtual Try<Nothing> listen(int backlog) = 0;
-
- /**
- * Returns a socket corresponding to the next pending connection
- * for the listening socket. All implementations will set the
- * NONBLOCK and CLOEXEC options on the returned socket.
- *
- * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
- * configurable by the caller of the interface.
- */
- virtual Future<std::shared_ptr<Impl>> accept() = 0;
-
- virtual Future<Nothing> connect(const Address& address) = 0;
- virtual Future<size_t> recv(char* data, size_t size) = 0;
- virtual Future<size_t> send(const char* data, size_t size) = 0;
- virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
-
- /**
- * An overload of `recv`, which receives data based on the specified
- * 'size' parameter.
- *
- * @param size
- * Value | Semantics
- * :-------:|-----------
- * 0 | Returns an empty string.
- * -1 | Receives until EOF.
- * N | Returns a string of size N.
- * 'None' | Returns a string of the available data.
- * If 'None' is specified, whenever data becomes available on the
- * socket, that much data will be returned.
- */
- // TODO(benh): Consider returning Owned<std::string> or
- // Shared<std::string>, the latter enabling reuse of a pool of
- // preallocated strings/buffers.
- virtual Future<std::string> recv(const Option<ssize_t>& size = None());
-
- /**
- * An overload of `send`, which sends all of the specified data.
- *
- * @param data The specified data to send.
- *
- * @return Nothing or an error in case the sending fails.
- */
- // TODO(benh): Consider taking Shared<std::string>, the latter
- // enabling reuse of a pool of preallocated strings/buffers.
- virtual Future<Nothing> send(const std::string& data);
-
- /**
- * Shutdown the receive-side of the socket. No further data can be
- * received from the socket.
- */
- // TODO(neilc): Change this to allow the caller to specify `how`.
- // See MESOS-5658.
- virtual Try<Nothing> shutdown()
- {
- if (::shutdown(s, SHUT_RD) < 0) {
- return ErrnoError();
- }
-
- return Nothing();
- }
+ virtual Kind kind() const = 0;
- virtual Socket::Kind kind() const = 0;
-
- protected:
- explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
- /**
- * Releases ownership of the file descriptor. Not exposed
- * via the `Socket` interface as this is only intended to
- * support `Socket::Impl` implementations that need to
- * override the file descriptor ownership.
- */
- int release()
- {
- int released = s;
- s = -1;
- return released;
- }
+protected:
+ explicit SocketImpl(int _s) : s(_s) { CHECK(s >= 0); }
+
+ /**
+ * Releases ownership of the file descriptor. Not exposed
+ * via the `Socket` interface as this is only intended to
+ * support `Socket::Impl` implementations that need to
+ * override the file descriptor ownership.
+ */
+ int release()
+ {
+ int released = s;
+ s = -1;
+ return released;
+ }
- /**
- * Returns a `std::shared_ptr<T>` from this implementation.
- */
- template <typename T>
- static std::shared_ptr<T> shared(T* t)
- {
- std::shared_ptr<T> pointer =
- std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
- CHECK(pointer);
- return pointer;
+ /**
+ * Returns a `std::shared_ptr<T>` from this implementation.
+ */
+ template <typename T>
+ static std::shared_ptr<T> shared(T* t)
+ {
+ std::shared_ptr<T> pointer =
+ std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
+ CHECK(pointer);
+ return pointer;
+ }
+
+ int s;
+};
+
+
+/**
+ * An abstraction around a socket (file descriptor).
+ *
+ * Provides reference counting such that the socket is only closed
+ * (and thus, has the possiblity of being reused) after there are no
+ * more references.
+ */
+template <typename AddressType>
+class Socket
+{
+public:
+ /**
+ * Returns an instance of a `Socket` using the specified kind of
+ * implementation.
+ *
+ * @param s Optional. The file descriptor to wrap with the `Socket`.
+ * @param kind Optional. The desired `Socket` implementation.
+ *
+ * @return An instance of a `Socket`.
+ */
+ static Try<Socket> create(
+ int s,
+ SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND())
+ {
+ Try<std::shared_ptr<SocketImpl>> impl = SocketImpl::create(s, kind);
+ if (impl.isError()) {
+ return Error(impl.error());
}
+ return Socket(impl.get());
+ }
- int s;
- };
+ /**
+ * Returns an instance of a `Socket` using `AddressType` to determine
+ * the address family to use. An optional implementation kind can be
+ * specified. The NONBLOCK and CLOEXEC options will be set on the
+ * underlying file descriptor for the socket.
+ *
+ * @param kind Optional. The desired `Socket` implementation.
+ *
+ * @return An instance of a `Socket`.
+ */
+ // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+ // configurable by the caller of the interface.
+ static Try<Socket> create(SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
+
+ /**
+ * Returns the kind representing the underlying implementation
+ * of the `Socket` instance.
+ *
+ * @see process::network::Socket::Kind
+ */
+ SocketImpl::Kind kind() const
+ {
+ return impl->kind();
+ }
bool operator==(const Socket& that) const
{
@@ -243,12 +293,12 @@ public:
return impl->get();
}
- Try<Address> address() const
+ Try<AddressType> address() const
{
return impl->address();
}
- Try<Address> peer() const
+ Try<AddressType> peer() const
{
return impl->peer();
}
@@ -258,7 +308,7 @@ public:
return impl->get();
}
- Try<Address> bind(const Address& address = Address::LOCALHOST_ANY())
+ Try<AddressType> bind(const AddressType& address)
{
return impl->bind(address);
}
@@ -272,12 +322,12 @@ public:
{
return impl->accept()
// TODO(benh): Use && for `impl` here!
- .then([](const std::shared_ptr<Impl>& impl) {
+ .then([](const std::shared_ptr<SocketImpl>& impl) {
return Socket(impl);
});
}
- Future<Nothing> connect(const Address& address)
+ Future<Nothing> connect(const AddressType& address)
{
return impl->connect(address);
}
@@ -312,14 +362,36 @@ public:
return impl->shutdown();
}
+ // Support implicit conversion of any `Socket<AddressType>` to a
+ // `Socket<network::Address>`.
+ operator Socket<network::Address>() const
+ {
+ return Socket<network::Address>(impl);
+ }
+
private:
- explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+ // Necessary to support the implicit conversion operator from any
+ // `Socket<AddressType>` to `Socket<network::Address>`.
+ template <typename T>
+ friend class Socket;
+
+ explicit Socket(std::shared_ptr<SocketImpl>&& that) : impl(std::move(that)) {}
- explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+ explicit Socket(const std::shared_ptr<SocketImpl>& that) : impl(that) {}
- std::shared_ptr<Impl> impl;
+ std::shared_ptr<SocketImpl> impl;
};
+} // namespace internal {
+
+
+using Socket = network::internal::Socket<network::Address>;
+
+namespace inet {
+
+using Socket = network::internal::Socket<inet::Address>;
+
+} // namespace inet {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/ssl/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/ssl/gtest.hpp b/3rdparty/libprocess/include/process/ssl/gtest.hpp
index 21a0fc4..8649da1 100644
--- a/3rdparty/libprocess/include/process/ssl/gtest.hpp
+++ b/3rdparty/libprocess/include/process/ssl/gtest.hpp
@@ -287,24 +287,25 @@ protected:
*
* @return Socket if successful otherwise an Error.
*/
- Try<process::network::Socket> setup_server(
+ Try<process::network::inet::Socket> setup_server(
const std::map<std::string, std::string>& environment)
{
set_environment_variables(environment);
- const Try<process::network::Socket> create =
- process::network::Socket::create(process::network::Socket::SSL);
+ const Try<process::network::inet::Socket> create =
+ process::network::inet::Socket::create(
+ process::network::internal::SocketImpl::Kind::SSL);
if (create.isError()) {
return Error(create.error());
}
- process::network::Socket server = create.get();
+ process::network::inet::Socket server = create.get();
// We need to explicitly bind to INADDR_LOOPBACK so the
// certificate we create in this test fixture can be verified.
- Try<process::network::Address> bind =
- server.bind(process::network::Address(net::IP(INADDR_LOOPBACK), 0));
+ Try<process::network::inet::Address> bind =
+ server.bind(process::network::inet::Address(net::IP(INADDR_LOOPBACK), 0));
if (bind.isError()) {
return Error(bind.error());
@@ -334,10 +335,10 @@ protected:
*/
Try<process::Subprocess> launch_client(
const std::map<std::string, std::string>& environment,
- const process::network::Socket& server,
+ const process::network::inet::Socket& server,
bool use_ssl_socket)
{
- const Try<process::network::Address> address = server.address();
+ const Try<process::network::inet::Address> address = server.address();
if (address.isError()) {
return Error(address.error());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 515821a..9667a62 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -46,7 +46,7 @@ public:
FILE
};
- explicit Encoder(const network::Socket& _s) : s(_s) {}
+ explicit Encoder(const network::inet::Socket& _s) : s(_s) {}
virtual ~Encoder() {}
virtual Kind kind() const = 0;
@@ -55,20 +55,20 @@ public:
virtual size_t remaining() const = 0;
- network::Socket socket() const
+ network::inet::Socket socket() const
{
return s;
}
private:
- const network::Socket s; // The socket this encoder is associated with.
+ const network::inet::Socket s; // The socket this encoder is associated with.
};
class DataEncoder : public Encoder
{
public:
- DataEncoder(const network::Socket& s, const std::string& _data)
+ DataEncoder(const network::inet::Socket& s, const std::string& _data)
: Encoder(s), data(_data), index(0) {}
virtual ~DataEncoder() {}
@@ -107,7 +107,7 @@ private:
class MessageEncoder : public DataEncoder
{
public:
- MessageEncoder(const network::Socket& s, Message* _message)
+ MessageEncoder(const network::inet::Socket& s, Message* _message)
: DataEncoder(s, encode(_message)), message(_message) {}
virtual ~MessageEncoder()
@@ -162,7 +162,7 @@ class HttpResponseEncoder : public DataEncoder
{
public:
HttpResponseEncoder(
- const network::Socket& s,
+ const network::inet::Socket& s,
const http::Response& response,
const http::Request& request)
: DataEncoder(s, encode(response, request)) {}
@@ -251,7 +251,7 @@ public:
class FileEncoder : public Encoder
{
public:
- FileEncoder(const network::Socket& s, int _fd, size_t _size)
+ FileEncoder(const network::inet::Socket& s, int _fd, size_t _size)
: Encoder(s), fd(_fd), size(_size), index(0) {}
virtual ~FileEncoder()
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 3f16f29..93a8a80 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -66,8 +66,10 @@ using std::vector;
using process::http::Request;
using process::http::Response;
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
+
+using process::network::internal::SocketImpl;
namespace process {
namespace http {
@@ -1341,12 +1343,12 @@ Future<Connection> connect(const URL& url)
Try<Socket> socket = [&url]() -> Try<Socket> {
// Default to 'http' if no scheme was specified.
if (url.scheme.isNone() || url.scheme == string("http")) {
- return Socket::create(Socket::POLL);
+ return Socket::create(SocketImpl::Kind::POLL);
}
if (url.scheme == string("https")) {
#ifdef USE_SSL_SOCKET
- return Socket::create(Socket::SSL);
+ return Socket::create(SocketImpl::Kind::SSL);
#else
return Error("'https' scheme requires SSL enabled");
#endif
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 9cade79..f9551bf 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -78,8 +78,9 @@ static Synchronized<bufferevent> synchronize(bufferevent* bev)
namespace process {
namespace network {
+namespace internal {
-Try<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::create(int s)
+Try<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::create(int s)
{
openssl::initialize();
@@ -437,7 +438,7 @@ void LibeventSSLSocketImpl::event_callback(short events)
LibeventSSLSocketImpl::LibeventSSLSocketImpl(int _s)
- : Socket::Impl(_s),
+ : SocketImpl(_s),
bev(nullptr),
listener(nullptr),
recv_request(nullptr),
@@ -450,7 +451,7 @@ LibeventSSLSocketImpl::LibeventSSLSocketImpl(
int _s,
bufferevent* _bev,
Option<string>&& _peer_hostname)
- : Socket::Impl(_s),
+ : SocketImpl(_s),
bev(_bev),
listener(nullptr),
recv_request(nullptr),
@@ -899,13 +900,13 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
}
-Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::accept()
{
// We explicitly specify the return type to avoid a type deduction
// issue in some versions of clang. See MESOS-2943.
return accept_queue.get()
- .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
- -> Future<std::shared_ptr<Socket::Impl>> {
+ .then([](const Future<std::shared_ptr<SocketImpl>>& impl)
+ -> Future<std::shared_ptr<SocketImpl>> {
CHECK(!impl.isPending());
return impl;
});
@@ -967,7 +968,7 @@ void LibeventSSLSocketImpl::peek_callback(
accept_SSL_callback(request);
} else {
// Downgrade to a non-SSL socket implementation.
- Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+ Try<std::shared_ptr<SocketImpl>> impl = PollSocketImpl::create(fd);
if (impl.isError()) {
request->promise.fail(impl.error());
} else {
@@ -983,13 +984,13 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
{
CHECK(__in_event_loop__);
- Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
+ Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue_ = accept_queue;
// After the socket is accepted, it must complete the SSL
// handshake (or be downgraded to a regular socket) before
// we put it in the queue of connected sockets.
request->promise.future()
- .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+ .onAny([accept_queue_](Future<std::shared_ptr<SocketImpl>> impl) mutable {
accept_queue_.put(impl);
});
@@ -1121,7 +1122,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
&LibeventSSLSocketImpl::event_callback,
CHECK_NOTNULL(impl->event_loop_handle));
- request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
+ request->promise.set(std::dynamic_pointer_cast<SocketImpl>(impl));
} else if (events & BEV_EVENT_ERROR) {
std::ostringstream stream;
if (EVUTIL_SOCKET_ERROR() != 0) {
@@ -1158,5 +1159,6 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
request);
}
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index ed53976..57eaf4f 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -27,26 +27,27 @@
namespace process {
namespace network {
+namespace internal {
-class LibeventSSLSocketImpl : public Socket::Impl
+class LibeventSSLSocketImpl : public SocketImpl
{
public:
// See 'Socket::create()'.
- static Try<std::shared_ptr<Socket::Impl>> create(int s);
+ static Try<std::shared_ptr<SocketImpl>> create(int s);
LibeventSSLSocketImpl(int _s);
virtual ~LibeventSSLSocketImpl();
- // Implement 'Socket::Impl' interface.
+ // Implement 'SocketImpl' interface.
virtual Future<Nothing> connect(const Address& address);
virtual Future<size_t> recv(char* data, size_t size);
// Send does not currently support discard. See implementation.
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
virtual Try<Nothing> listen(int backlog);
- virtual Future<std::shared_ptr<Socket::Impl>> accept();
- virtual Socket::Kind kind() const { return Socket::SSL; }
+ virtual Future<std::shared_ptr<SocketImpl>> accept();
+ virtual SocketImpl::Kind kind() const { return SocketImpl::Kind::SSL; }
// This call is used to do the equivalent of shutting down the read
// end. This means finishing the future of any outstanding read
@@ -74,7 +75,7 @@ private:
socket(_socket),
ip(_ip) {}
event* peek_event;
- Promise<std::shared_ptr<Socket::Impl>> promise;
+ Promise<std::shared_ptr<SocketImpl>> promise;
evconnlistener* listener;
int socket;
Option<net::IP> ip;
@@ -175,12 +176,13 @@ private:
// downgraded). The 'accept()' call returns sockets from this queue.
// We wrap the socket in a 'Future' so that we can pass failures or
// discards through.
- Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
+ Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
Option<std::string> peer_hostname;
Option<net::IP> peer_ip;
};
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index f9313cd..ffb1021 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -101,7 +101,7 @@ istream& operator>>(istream& stream, UPID& pid)
string id;
string host;
- network::Address address;
+ network::inet::Address address;
size_t index = str.find('@');
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index ff06e56..9184ea3 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -32,8 +32,9 @@ using std::string;
namespace process {
namespace network {
+namespace internal {
-Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+Try<std::shared_ptr<SocketImpl>> PollSocketImpl::create(int s)
{
return std::make_shared<PollSocketImpl>(s);
}
@@ -97,12 +98,12 @@ Future<int> accept(int fd)
} // namespace internal {
-Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
+Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
{
return io::poll(get(), io::READ)
.then(lambda::bind(&internal::accept, get()))
- .then([](int s) -> Future<std::shared_ptr<Socket::Impl>> {
- Try<std::shared_ptr<Socket::Impl>> impl = create(s);
+ .then([](int s) -> Future<std::shared_ptr<SocketImpl>> {
+ Try<std::shared_ptr<SocketImpl>> impl = create(s);
if (impl.isError()) {
os::close(s);
return Failure("Failed to create socket: " + impl.error());
@@ -276,5 +277,6 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
size));
}
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index 3ba3678..89789e6 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -18,26 +18,27 @@
namespace process {
namespace network {
+namespace internal {
-class PollSocketImpl : public Socket::Impl
+class PollSocketImpl : public SocketImpl
{
public:
- static Try<std::shared_ptr<Socket::Impl>> create(int s);
+ static Try<std::shared_ptr<SocketImpl>> create(int s);
- PollSocketImpl(int s) : Socket::Impl(s) {}
+ PollSocketImpl(int s) : SocketImpl(s) {}
virtual ~PollSocketImpl() {}
- // Implementation of the Socket::Impl interface.
+ // Implementation of the SocketImpl interface.
virtual Try<Nothing> listen(int backlog);
- virtual Future<std::shared_ptr<Socket::Impl>> accept();
+ virtual Future<std::shared_ptr<SocketImpl>> accept();
virtual Future<Nothing> connect(const Address& address);
virtual Future<size_t> recv(char* data, size_t size);
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
-
- virtual Socket::Kind kind() const { return Socket::POLL; }
+ virtual Kind kind() const { return SocketImpl::Kind::POLL; }
};
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e9a4bbb..a07d5e3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -134,8 +134,10 @@ using process::http::authentication::AuthenticatorManager;
using process::http::authorization::AuthorizationCallbacks;
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
+
+using process::network::internal::SocketImpl;
using std::deque;
using std::find;
@@ -371,7 +373,7 @@ public:
void link(ProcessBase* process,
const UPID& to,
const ProcessBase::RemoteConnection remote,
- const Socket::Kind& kind = Socket::DEFAULT_KIND());
+ const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
// Test-only method to fetch the file descriptor behind a
// persistent socket.
@@ -389,7 +391,7 @@ public:
const Request& request,
const Socket& socket);
void send(Message* message,
- const Socket::Kind& kind = Socket::DEFAULT_KIND());
+ const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
Encoder* next(int s);
@@ -1683,7 +1685,7 @@ void SocketManager::link_connect(
future.isFailed() &&
network::openssl::flags().enabled &&
network::openssl::flags().support_downgrade &&
- socket.kind() == Socket::SSL;
+ socket.kind() == SocketImpl::Kind::SSL;
Option<Socket> poll_socket = None();
@@ -1699,7 +1701,7 @@ void SocketManager::link_connect(
return;
}
- Try<Socket> create = Socket::create(Socket::POLL);
+ Try<Socket> create = Socket::create(SocketImpl::Kind::POLL);
if (create.isError()) {
VLOG(1) << "Failed to link, create socket: " << create.error();
socket_manager->close(socket);
@@ -1779,7 +1781,7 @@ void SocketManager::link(
ProcessBase* process,
const UPID& to,
const ProcessBase::RemoteConnection remote,
- const Socket::Kind& kind)
+ const SocketImpl::Kind& kind)
{
// TODO(benh): The semantics we want to support for link are such
// that if there is nobody to link to (local or remote) then an
@@ -2101,7 +2103,7 @@ void SocketManager::send_connect(
future.isFailed() &&
network::openssl::flags().enabled &&
network::openssl::flags().support_downgrade &&
- socket.kind() == Socket::SSL;
+ socket.kind() == SocketImpl::Kind::SSL;
Option<Socket> poll_socket = None();
@@ -2109,7 +2111,7 @@ void SocketManager::send_connect(
// POLL socket.
if (attempt_downgrade) {
synchronized (mutex) {
- Try<Socket> create = Socket::create(Socket::POLL);
+ Try<Socket> create = Socket::create(SocketImpl::Kind::POLL);
if (create.isError()) {
VLOG(1) << "Failed to link, create socket: " << create.error();
socket_manager->close(socket);
@@ -2168,7 +2170,7 @@ void SocketManager::send_connect(
}
-void SocketManager::send(Message* message, const Socket::Kind& kind)
+void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
{
CHECK(message != nullptr);
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 8f372aa..0b7631d 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -32,89 +32,72 @@ using std::string;
namespace process {
namespace network {
+namespace internal {
-Try<Socket> Socket::create(Kind kind, Option<int> s)
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(int s, Kind kind)
{
- // If the caller passed in a file descriptor, we do
- // not own its life cycle and must not close it.
- bool owned = s.isNone();
+ switch (kind) {
+ case Kind::POLL:
+ return PollSocketImpl::create(s);
+#ifdef USE_SSL_SOCKET
+ case Kind::SSL:
+ return LibeventSSLSocketImpl::create(s);
+#endif
+ }
+}
+
- if (owned) {
- // Supported in Linux >= 2.6.27.
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(Kind kind)
+{
+ // Supported in Linux >= 2.6.27.
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
- Try<int> fd =
- network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+ Try<int> s =
+ network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
- if (fd.isError()) {
- return Error("Failed to create socket: " + fd.error());
- }
+ if (s.isError()) {
+ return Error("Failed to create socket: " + s.error());
+ }
#else
- Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
- if (fd.isError()) {
- return Error("Failed to create socket: " + fd.error());
- }
-
- Try<Nothing> nonblock = os::nonblock(fd.get());
- if (nonblock.isError()) {
- os::close(fd.get());
- return Error("Failed to create socket, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(fd.get());
- if (cloexec.isError()) {
- os::close(fd.get());
- return Error("Failed to create socket, cloexec: " + cloexec.error());
- }
-#endif
+ Try<int> s = network::socket(AF_INET, SOCK_STREAM, 0);
+ if (s.isError()) {
+ return Error("Failed to create socket: " + s.error());
+ }
- s = fd.get();
+ Try<Nothing> nonblock = os::nonblock(s.get());
+ if (nonblock.isError()) {
+ os::close(s.get());
+ return Error("Failed to create socket, nonblock: " + nonblock.error());
}
- switch (kind) {
- case POLL: {
- Try<std::shared_ptr<Socket::Impl>> socket =
- PollSocketImpl::create(s.get());
- if (socket.isError()) {
- if (owned) {
- os::close(s.get());
- }
- return Error(socket.error());
- }
- return Socket(socket.get());
- }
-#ifdef USE_SSL_SOCKET
- case SSL: {
- Try<std::shared_ptr<Socket::Impl>> socket =
- LibeventSSLSocketImpl::create(s.get());
- if (socket.isError()) {
- if (owned) {
- os::close(s.get());
- }
- return Error(socket.error());
- }
- return Socket(socket.get());
- }
+ Try<Nothing> cloexec = os::cloexec(s.get());
+ if (cloexec.isError()) {
+ os::close(s.get());
+ return Error("Failed to create socket, cloexec: " + cloexec.error());
+ }
#endif
- // By not setting a default we leverage the compiler errors when
- // the enumeration is augmented to find all the cases we need to
- // provide.
+
+ Try<std::shared_ptr<SocketImpl>> impl = create(s.get(), kind);
+ if (impl.isError()) {
+ os::close(s.get());
}
+
+ return impl;
}
-Socket::Kind Socket::DEFAULT_KIND()
+SocketImpl::Kind SocketImpl::DEFAULT_KIND()
{
// NOTE: Some tests may change the OpenSSL flags and reinitialize
// libprocess. In non-test code, the return value should be constant.
#ifdef USE_SSL_SOCKET
- return network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
+ return network::openssl::flags().enabled ? Kind::SSL : Kind::POLL;
#else
- return Socket::POLL;
+ return Kind::POLL;
#endif
}
-Try<Address> Socket::Impl::address() const
+Try<Address> SocketImpl::address() const
{
// TODO(benh): Cache this result so that we don't have to make
// unnecessary system calls each time.
@@ -122,7 +105,7 @@ Try<Address> Socket::Impl::address() const
}
-Try<Address> Socket::Impl::peer() const
+Try<Address> SocketImpl::peer() const
{
// TODO(benh): Cache this result so that we don't have to make
// unnecessary system calls each time.
@@ -130,7 +113,7 @@ Try<Address> Socket::Impl::peer() const
}
-Try<Address> Socket::Impl::bind(const Address& address)
+Try<Address> SocketImpl::bind(const Address& address)
{
Try<Nothing> bind = network::bind(get(), address);
if (bind.isError()) {
@@ -143,7 +126,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
static Future<string> _recv(
- const std::shared_ptr<Socket::Impl>& impl,
+ const std::shared_ptr<SocketImpl>& impl,
const Option<ssize_t>& size,
Owned<string> buffer,
size_t chunk,
@@ -191,7 +174,7 @@ static Future<string> _recv(
}
-Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
+Future<string> SocketImpl::recv(const Option<ssize_t>& size)
{
// Default chunk size to attempt to receive when nothing is
// specified represents roughly 16 pages.
@@ -216,7 +199,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
static Future<Nothing> _send(
- const std::shared_ptr<Socket::Impl>& impl,
+ const std::shared_ptr<SocketImpl>& impl,
Owned<string> data,
size_t index,
size_t length)
@@ -235,7 +218,7 @@ static Future<Nothing> _send(
}
-Future<Nothing> Socket::Impl::send(const string& _data)
+Future<Nothing> SocketImpl::send(const string& _data)
{
Owned<string> data(new string(_data));
@@ -243,6 +226,6 @@ Future<Nothing> Socket::Impl::send(const string& _data)
.then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
}
-
+} // namespace internal {
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index 5f84d84..7356716 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -32,10 +32,6 @@ using process::ResponseDecoder;
using process::StreamingRequestDecoder;
using process::StreamingResponseDecoder;
-using process::http::Request;
-
-using process::network::Socket;
-
using std::deque;
using std::string;
@@ -78,7 +74,7 @@ TYPED_TEST(RequestDecoderTest, Request)
EXPECT_SOME_EQ("value2", request->url.query.get("key2"));
Future<string> body = [&request]() -> Future<string> {
- if (request->type == Request::BODY) {
+ if (request->type == http::Request::BODY) {
return request->body;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index d41929a..22ec432 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -62,7 +62,7 @@ using process::Promise;
using process::http::URL;
-using process::network::Socket;
+using process::network::inet::Socket;
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index ea798d0..0424a10 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -93,8 +93,8 @@ using process::UPID;
using process::firewall::DisabledEndpointsFirewallRule;
using process::firewall::FirewallRule;
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
using std::move;
using std::string;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/ssl_client.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_client.cpp b/3rdparty/libprocess/src/tests/ssl_client.cpp
index 8d62fc2..de87b3b 100644
--- a/3rdparty/libprocess/src/tests/ssl_client.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_client.cpp
@@ -32,8 +32,10 @@
namespace network = process::network;
namespace openssl = network::openssl;
-using network::Address;
-using network::Socket;
+using network::inet::Address;
+using network::inet::Socket;
+
+using network::internal::SocketImpl;
using process::Future;
@@ -131,8 +133,8 @@ TEST_F(SSLClientTest, client)
// Create the socket based on the 'use_ssl' flag. We use this to
// test whether a regular socket could connect to an SSL server
// socket.
- const Try<Socket> create =
- Socket::create(flags.use_ssl ? Socket::SSL : Socket::POLL);
+ const Try<Socket> create = Socket::create(
+ flags.use_ssl ? SocketImpl::Kind::SSL : SocketImpl::Kind::POLL);
ASSERT_SOME(create);
Socket socket = create.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/ssl_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_tests.cpp b/3rdparty/libprocess/src/tests/ssl_tests.cpp
index 55c8c30..bdb9420 100644
--- a/3rdparty/libprocess/src/tests/ssl_tests.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_tests.cpp
@@ -49,8 +49,10 @@ namespace io = process::io;
namespace network = process::network;
namespace openssl = network::openssl;
-using network::Address;
-using network::Socket;
+using network::inet::Address;
+using network::inet::Socket;
+
+using network::internal::SocketImpl;
using process::Clock;
using process::Failure;
@@ -112,7 +114,7 @@ TEST(SSL, Disabled)
{
os::setenv("LIBPROCESS_SSL_ENABLED", "false");
openssl::reinitialize();
- EXPECT_ERROR(Socket::create(Socket::SSL));
+ EXPECT_ERROR(Socket::create(SocketImpl::Kind::SSL));
}
@@ -130,10 +132,10 @@ TEST_P(SSLTest, BasicSameProcess)
openssl::reinitialize();
- const Try<Socket> server_create = Socket::create(Socket::SSL);
+ const Try<Socket> server_create = Socket::create(SocketImpl::Kind::SSL);
ASSERT_SOME(server_create);
- const Try<Socket> client_create = Socket::create(Socket::SSL);
+ const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
ASSERT_SOME(client_create);
Socket server = server_create.get();
@@ -623,7 +625,7 @@ TEST_F(SSLTest, PeerAddress)
{"LIBPROCESS_SSL_CERT_FILE", certificate_path().string()}});
ASSERT_SOME(server);
- const Try<Socket> client_create = Socket::create(Socket::SSL);
+ const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
ASSERT_SOME(client_create);
Socket client = client_create.get();
@@ -754,7 +756,7 @@ TEST_F(SSLTest, SilentSocket)
// not complete the SSL handshake, nor be downgraded.
// As a result, we expect that the server will not see
// an accepted socket for this connection.
- Try<Socket> connection = Socket::create(Socket::POLL);
+ Try<Socket> connection = Socket::create(SocketImpl::Kind::POLL);
ASSERT_SOME(connection);
connection->connect(server->address().get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 1f6cfaf..99ea1eb 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -30,8 +30,8 @@ using process::Message;
using process::MessageEncoder;
using process::UPID;
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
static const int LISTEN_BACKLOG = 10;
[05/14] mesos git commit: Removed `Socket` dependency on
`Socket::Impl`.
Posted by be...@apache.org.
Removed `Socket` dependency on `Socket::Impl`.
Review: https://reviews.apache.org/r/53458
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aa68496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aa68496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aa68496
Branch: refs/heads/master
Commit: 7aa68496df050ed1fc7be8aed0f9b46230039c3a
Parents: 5c093c8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 22:33:31 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 26 -----------------
3rdparty/libprocess/src/libevent_ssl_socket.cpp | 30 ++++++++++----------
3rdparty/libprocess/src/libevent_ssl_socket.hpp | 6 ++--
3rdparty/libprocess/src/poll_socket.cpp | 30 +++++++++++---------
3rdparty/libprocess/src/socket.cpp | 20 ++++++-------
5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 97bdcb9..a70954a 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -202,27 +202,6 @@ public:
virtual Socket::Kind kind() const = 0;
- /**
- * Construct a new `Socket` from the given impl.
- *
- * This is a proxy function, as `Impl`s derived from this won't have
- * access to the Socket::Socket(...) constructors.
- */
- // TODO(jmlvanre): These should be protected; however, gcc complains
- // when using them from within a lambda of a derived class.
- static Socket socket(std::shared_ptr<Impl>&& that)
- {
- return Socket(std::move(that));
- }
-
- /**
- * @copydoc process::network::Socket::Impl::socket
- */
- static Socket socket(const std::shared_ptr<Impl>& that)
- {
- return Socket(that);
- }
-
protected:
explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
@@ -240,11 +219,6 @@ public:
}
/**
- * Construct a `Socket` wrapper from this implementation.
- */
- Socket socket() { return Socket(shared_from_this()); }
-
- /**
* Returns a `std::shared_ptr<T>` from this implementation.
*/
template <typename T>
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 5c0929d..9cade79 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -31,6 +31,7 @@
#include "libevent.hpp"
#include "libevent_ssl_socket.hpp"
#include "openssl.hpp"
+#include "poll_socket.hpp"
// Locking:
//
@@ -898,14 +899,15 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
}
-Future<Socket> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
{
// We explicitly specify the return type to avoid a type deduction
// issue in some versions of clang. See MESOS-2943.
return accept_queue.get()
- .then([](const Future<Socket>& socket) -> Future<Socket> {
- CHECK(!socket.isPending());
- return socket;
+ .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
+ -> Future<std::shared_ptr<Socket::Impl>> {
+ CHECK(!impl.isPending());
+ return impl;
});
}
@@ -964,12 +966,12 @@ void LibeventSSLSocketImpl::peek_callback(
if (ssl) {
accept_SSL_callback(request);
} else {
- // Downgrade to a non-SSL socket.
- Try<Socket> create = Socket::create(Socket::POLL, fd);
- if (create.isError()) {
- request->promise.fail(create.error());
+ // Downgrade to a non-SSL socket implementation.
+ Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+ if (impl.isError()) {
+ request->promise.fail(impl.error());
} else {
- request->promise.set(create.get());
+ request->promise.set(impl.get());
}
delete request;
@@ -981,14 +983,14 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
{
CHECK(__in_event_loop__);
- Queue<Future<Socket>> accept_queue_ = accept_queue;
+ Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
// After the socket is accepted, it must complete the SSL
// handshake (or be downgraded to a regular socket) before
// we put it in the queue of connected sockets.
request->promise.future()
- .onAny([accept_queue_](Future<Socket> socket) mutable {
- accept_queue_.put(socket);
+ .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+ accept_queue_.put(impl);
});
// If we support downgrading the connection, first wait for this
@@ -1119,9 +1121,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
&LibeventSSLSocketImpl::event_callback,
CHECK_NOTNULL(impl->event_loop_handle));
- Socket socket = Socket::Impl::socket(std::move(impl));
-
- request->promise.set(socket);
+ request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
} else if (events & BEV_EVENT_ERROR) {
std::ostringstream stream;
if (EVUTIL_SOCKET_ERROR() != 0) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index acb00d4..ed53976 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -45,7 +45,7 @@ public:
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
virtual Try<Nothing> listen(int backlog);
- virtual Future<Socket> accept();
+ virtual Future<std::shared_ptr<Socket::Impl>> accept();
virtual Socket::Kind kind() const { return Socket::SSL; }
// This call is used to do the equivalent of shutting down the read
@@ -74,7 +74,7 @@ private:
socket(_socket),
ip(_ip) {}
event* peek_event;
- Promise<Socket> promise;
+ Promise<std::shared_ptr<Socket::Impl>> promise;
evconnlistener* listener;
int socket;
Option<net::IP> ip;
@@ -175,7 +175,7 @@ private:
// downgraded). The 'accept()' call returns sockets from this queue.
// We wrap the socket in a 'Future' so that we can pass failures or
// discards through.
- Queue<Future<Socket>> accept_queue;
+ Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
Option<std::string> peer_hostname;
Option<net::IP> peer_ip;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index d04f048..ff06e56 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -114,12 +114,14 @@ Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
namespace internal {
-Future<Nothing> connect(const Socket& socket, const Address& to)
+Future<Nothing> connect(
+ const std::shared_ptr<PollSocketImpl>& socket,
+ const Address& to)
{
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
- int s = socket.get();
+ int s = socket->get();
// NOTE: We cast to `char*` here because the function prototypes on Windows
// use `char*` instead of `void*`.
@@ -149,7 +151,7 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
if (connect.isError()) {
if (net::is_inprogress_error(connect.error().code)) {
return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, socket(), address));
+ .then(lambda::bind(&internal::connect, shared(this), address));
}
return Failure(connect.error());
@@ -167,12 +169,14 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
namespace internal {
-Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
+Future<size_t> socket_send_data(
+ const std::shared_ptr<PollSocketImpl>& impl,
+ const char* data, size_t size)
{
CHECK(size > 0);
while (true) {
- ssize_t length = net::send(socket.get(), data, size, MSG_NOSIGNAL);
+ ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
#ifdef __WINDOWS__
int error = WSAGetLastError();
@@ -185,8 +189,8 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
continue;
} else if (length < 0 && net::is_retryable_error(error)) {
// Might block, try again later.
- return io::poll(socket.get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, socket, data, size));
+ return io::poll(impl->get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, impl, data, size));
} else if (length <= 0) {
// Socket error or closed.
if (length < 0) {
@@ -207,7 +211,7 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
Future<size_t> socket_send_file(
- Socket socket,
+ const std::shared_ptr<PollSocketImpl>& impl,
int fd,
off_t offset,
size_t size)
@@ -216,7 +220,7 @@ Future<size_t> socket_send_file(
while (true) {
Try<ssize_t, SocketError> length =
- os::sendfile(socket.get(), fd, offset, size);
+ os::sendfile(impl->get(), fd, offset, size);
if (length.isSome()) {
CHECK(length.get() >= 0);
@@ -232,10 +236,10 @@ Future<size_t> socket_send_file(
continue;
} else if (net::is_retryable_error(length.error().code)) {
// Might block, try again later.
- return io::poll(socket.get(), io::WRITE)
+ return io::poll(impl->get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_file,
- socket,
+ impl,
fd,
offset,
size));
@@ -255,7 +259,7 @@ Future<size_t> PollSocketImpl::send(const char* data, size_t size)
return io::poll(get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_data,
- socket(),
+ shared(this),
data,
size));
}
@@ -266,7 +270,7 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
return io::poll(get(), io::WRITE)
.then(lambda::bind(
&internal::socket_send_file,
- socket(),
+ shared(this),
fd,
offset,
size));
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 7f93168..8f372aa 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -143,7 +143,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
static Future<string> _recv(
- Socket socket,
+ const std::shared_ptr<Socket::Impl>& impl,
const Option<ssize_t>& size,
Owned<string> buffer,
size_t chunk,
@@ -165,9 +165,9 @@ static Future<string> _recv(
// We've been asked to receive until EOF so keep receiving since
// according to the 'length == 0' check above we haven't reached
// EOF yet.
- return socket.recv(data.get(), chunk)
+ return impl->recv(data.get(), chunk)
.then(lambda::bind(&_recv,
- socket,
+ impl,
size,
buffer,
chunk,
@@ -176,9 +176,9 @@ static Future<string> _recv(
} else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
// We've been asked to receive a particular amount of data and we
// haven't yet received that much data so keep receiving.
- return socket.recv(data.get(), size.get() - buffer->size())
+ return impl->recv(data.get(), size.get() - buffer->size())
.then(lambda::bind(&_recv,
- socket,
+ impl,
size,
buffer,
chunk,
@@ -206,7 +206,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
return recv(data.get(), chunk)
.then(lambda::bind(&_recv,
- socket(),
+ shared_from_this(),
size,
buffer,
chunk,
@@ -216,7 +216,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
static Future<Nothing> _send(
- Socket socket,
+ const std::shared_ptr<Socket::Impl>& impl,
Owned<string> data,
size_t index,
size_t length)
@@ -230,8 +230,8 @@ static Future<Nothing> _send(
}
// Keep sending!
- return socket.send(data->data() + index, data->size() - index)
- .then(lambda::bind(&_send, socket, data, index, lambda::_1));
+ return impl->send(data->data() + index, data->size() - index)
+ .then(lambda::bind(&_send, impl, data, index, lambda::_1));
}
@@ -240,7 +240,7 @@ Future<Nothing> Socket::Impl::send(const string& _data)
Owned<string> data(new string(_data));
return send(data->data(), data->size())
- .then(lambda::bind(&_send, socket(), data, 0, lambda::_1));
+ .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
}
[08/14] mesos git commit: Refactor necessary after removing default
Address constructor.
Posted by be...@apache.org.
Refactor necessary after removing default Address constructor.
Review: https://reviews.apache.org/r/54173
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46942677
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46942677
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46942677
Branch: refs/heads/master
Commit: 46942677e388d3a9835bbb1ddaabadf4b6e92ccd
Parents: 907ef80
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Nov 29 11:23:45 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 58 +++++++++++++++++---------------
1 file changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/46942677/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 8204327..200cefc 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -442,24 +442,24 @@ private:
set<int> dispose;
// Map from socket to socket address for outbound sockets.
- map<int, Address> addresses;
+ hashmap<int, Address> addresses;
// Map from socket address to temporary sockets (outbound sockets
// that will be closed once there is no more data to send on them).
- map<Address, int> temps;
+ hashmap<Address, int> temps;
// Map from socket address (ip, port) to persistent sockets
// (outbound sockets that will remain open even if there is no more
// data to send on them). We distinguish these from the 'temps'
// collection so we can tell when a persistent socket has been lost
// (and thus generate ExitedEvents).
- map<Address, int> persists;
+ hashmap<Address, int> persists;
// Map from outbound socket to outgoing queue.
- map<int, queue<Encoder*>> outgoing;
+ hashmap<int, queue<Encoder*>> outgoing;
// HTTP proxies.
- map<int, HttpProxy*> proxies;
+ hashmap<int, HttpProxy*> proxies;
// Protects instance variables.
std::recursive_mutex mutex;
@@ -1822,9 +1822,9 @@ void SocketManager::link(
CHECK(sockets.count(s) == 0);
sockets.emplace(s, socket.get());
- addresses[s] = to.address;
+ addresses.emplace(s, to.address);
- persists[to.address] = s;
+ persists.emplace(to.address, s);
// Initialize 'outgoing' to prevent a race with
// SocketManager::send() while the socket is not yet connected.
@@ -2220,8 +2220,8 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
CHECK(sockets.count(s) == 0);
sockets.emplace(s, socket.get());
- addresses[s] = address;
- temps[address] = s;
+ addresses.emplace(s, address);
+ temps.emplace(address, s);
dispose.insert(s);
@@ -2285,10 +2285,10 @@ Encoder* SocketManager::next(int s)
// This is either a temporary socket we created or it's a
// socket that we were receiving data from and possibly
// sending HTTP responses back on. Clean up either way.
- if (addresses.count(s) > 0) {
- const Address& address = addresses[s];
- CHECK(temps.count(address) > 0 && temps[address] == s);
- temps.erase(address);
+ Option<Address> address = addresses.get(s);
+ if (address.isSome()) {
+ CHECK(temps.count(address.get()) > 0 && temps[address.get()] == s);
+ temps.erase(address.get());
addresses.erase(s);
}
@@ -2356,15 +2356,15 @@ void SocketManager::close(int s)
}
// Clean up after sockets used for remote communication.
- if (addresses.count(s) > 0) {
- const Address& address = addresses[s];
-
+ Option<Address> address = addresses.get(s);
+ if (address.isSome()) {
// Don't bother invoking `exited` unless socket was persistent.
- if (persists.count(address) > 0 && persists[address] == s) {
- persists.erase(address);
- exited(address); // Generate ExitedEvent(s)!
- } else if (temps.count(address) > 0 && temps[address] == s) {
- temps.erase(address);
+ if (persists.count(address.get()) > 0 && persists[address.get()] == s) {
+ persists.erase(address.get());
+ exited(address.get()); // Generate ExitedEvent(s)!
+ } else if (temps.count(address.get()) > 0 &&
+ temps[address.get()] == s) {
+ temps.erase(address.get());
}
addresses.erase(s);
@@ -2551,18 +2551,20 @@ void SocketManager::swap_implementing_socket(
// Update the fd that this address is associated with. Once we've
// done this we can update the 'temps' and 'persists'
// data structures using this updated address.
- addresses[to_fd] = addresses[from_fd];
+ Option<Address> address = addresses.get(from_fd);
+ CHECK_SOME(address);
+ addresses.emplace(to_fd, address.get());
addresses.erase(from_fd);
// If this address is a persistent or temporary link
// that matches the original FD.
- if (persists.count(addresses[to_fd]) > 0 &&
- persists.at(addresses[to_fd]) == from_fd) {
- persists[addresses[to_fd]] = to_fd;
+ if (persists.count(address.get()) > 0 &&
+ persists.at(address.get()) == from_fd) {
+ persists[address.get()] = to_fd;
// No need to erase as we're changing the value, not the key.
- } else if (temps.count(addresses[to_fd]) > 0 &&
- temps.at(addresses[to_fd]) == from_fd) {
- temps[addresses[to_fd]] = to_fd;
+ } else if (temps.count(address.get()) > 0 &&
+ temps.at(address.get()) == from_fd) {
+ temps[address.get()] = to_fd;
// No need to erase as we're changing the value, not the key.
}