You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/02/08 16:51:34 UTC
[07/20] mesos git commit: Replaced Node with network::Address.
Replaced Node with network::Address.
Review: https://reviews.apache.org/r/29538
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/257bd1ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/257bd1ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/257bd1ad
Branch: refs/heads/master
Commit: 257bd1adaf8f0e236d975c4a2d93aa06cc6a79f1
Parents: 55f2325
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 24 08:48:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/Makefile.am | 2 +-
3rdparty/libprocess/include/process/address.hpp | 84 +++++++++++
3rdparty/libprocess/include/process/future.hpp | 2 +-
3rdparty/libprocess/include/process/network.hpp | 47 +++---
3rdparty/libprocess/include/process/node.hpp | 72 ---------
3rdparty/libprocess/include/process/pid.hpp | 36 ++---
3rdparty/libprocess/include/process/process.hpp | 5 +-
3rdparty/libprocess/include/process/socket.hpp | 20 ++-
3rdparty/libprocess/src/http.cpp | 23 +--
3rdparty/libprocess/src/pid.cpp | 20 +--
3rdparty/libprocess/src/poll_socket.cpp | 4 +-
3rdparty/libprocess/src/poll_socket.hpp | 2 +-
3rdparty/libprocess/src/process.cpp | 151 ++++++++++---------
3rdparty/libprocess/src/socket.cpp | 14 +-
3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 2 +-
3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
3rdparty/libprocess/src/tests/process_tests.cpp | 13 +-
18 files changed, 273 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index 988c8d5..3da3e6c 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -1,5 +1,6 @@
# Headers.
nobase_include_HEADERS = \
+ process/address.hpp \
process/async.hpp \
process/check.hpp \
process/clock.hpp \
@@ -32,7 +33,6 @@ nobase_include_HEADERS = \
process/mime.hpp \
process/mutex.hpp \
process/network.hpp \
- process/node.hpp \
process/once.hpp \
process/owned.hpp \
process/pid.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
new file mode 100644
index 0000000..7db6271
--- /dev/null
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -0,0 +1,84 @@
+#ifndef __PROCESS_ADDRESS_HPP__
+#define __PROCESS_ADDRESS_HPP__
+
+#include <stdint.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+
+#include <glog/logging.h>
+
+#include <sstream>
+
+#include <boost/functional/hash.hpp>
+
+namespace process {
+namespace network {
+
+// Represents a network "address", subsuming the struct addrinfo and
+// struct sockaddr* that typically is used to encapsulate IP and port.
+class Address
+{
+public:
+ Address() : ip(0), port(0) {}
+
+ Address(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
+
+ bool operator < (const Address& that) const
+ {
+ if (ip == that.ip) {
+ return port < that.port;
+ } else {
+ return ip < that.ip;
+ }
+ }
+
+ bool operator > (const Address& that) const
+ {
+ if (ip == that.ip) {
+ return port > that.port;
+ } else {
+ return ip > that.ip;
+ }
+ }
+
+ bool operator == (const Address& that) const
+ {
+ return (ip == that.ip && port == that.port);
+ }
+
+ bool operator != (const Address& that) const
+ {
+ return !(*this == that);
+ }
+
+ uint32_t ip;
+ uint16_t port;
+};
+
+
+inline std::ostream& operator << (std::ostream& stream, const Address& address)
+{
+ char ip[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, (in_addr*) &address.ip, ip, INET_ADDRSTRLEN) == NULL) {
+ PLOG(FATAL) << "Failed to get human-readable IP address for '"
+ << address.ip << "'";
+ }
+ stream << ip << ":" << address.port;
+ return stream;
+}
+
+
+// Address hash value (for example, to use in Boost's unordered maps).
+inline std::size_t hash_value(const Address& address)
+{
+ size_t seed = 0;
+ boost::hash_combine(seed, address.ip);
+ boost::hash_combine(seed, address.port);
+ return seed;
+}
+
+} // namespace network {
+} // namespace process {
+
+#endif // __PROCESS_ADDRESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 05bfa34..68d6682 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -37,7 +37,7 @@
namespace process {
-// Forward declaration (instead of include to break circular dependency).
+// Forward declarations (instead of include to break circular dependency).
template <typename _F>
struct _Defer;
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/network.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/network.hpp b/3rdparty/libprocess/include/process/network.hpp
index 2ceea8c..7620278 100644
--- a/3rdparty/libprocess/include/process/network.hpp
+++ b/3rdparty/libprocess/include/process/network.hpp
@@ -1,7 +1,7 @@
#ifndef __PROCESS_NETWORK_HPP__
#define __PROCESS_NETWORK_HPP__
-#include <process/node.hpp>
+#include <process/address.hpp>
#include <stout/net.hpp>
#include <stout/try.hpp>
@@ -31,7 +31,8 @@ inline Try<int> socket(int family, int type, int protocol)
return s;
}
-// accept, bind, connect, getsockname wrappers for different protocol families
+
+// TODO(benh): Remove and defer to Socket::accept.
inline Try<int> accept(int s, sa_family_t family)
{
switch (family) {
@@ -52,48 +53,54 @@ inline Try<int> accept(int s, sa_family_t family)
}
-inline Try<int> bind(int s, const Node& node)
+// TODO(benh): Remove and defer to Socket::bind.
+inline Try<int> bind(int s, const Address& address)
{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+ sockaddr_in addr = net::createSockaddrIn(address.ip, address.port);
int error = ::bind(s, (sockaddr*) &addr, sizeof(addr));
if (error < 0) {
- return ErrnoError("Failed to bind on " + stringify(node));
+ return ErrnoError("Failed to bind on " + stringify(address));
}
return error;
}
-inline Try<int> connect(int s, const Node& node)
+// TODO(benh): Remove and defer to Socket::connect.
+inline Try<int> connect(int s, const Address& address)
{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+ sockaddr_in addr = net::createSockaddrIn(address.ip, address.port);
int error = ::connect(s, (sockaddr*) &addr, sizeof(addr));
if (error < 0) {
- return ErrnoError("Failed to connect to " + stringify(node));
+ return ErrnoError("Failed to connect to " + stringify(address));
}
return error;
}
-inline Try<Node> getsockname(int s, sa_family_t family)
+inline Try<Address> address(int s)
{
- switch (family) {
- case AF_INET: {
- sockaddr_in addr = net::createSockaddrIn(0, 0);
- socklen_t addrlen = sizeof(addr);
+ union {
+ struct sockaddr s;
+ struct sockaddr_in v4;
+ struct sockaddr_in6 v6;
+ } addr;
- if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
- return ErrnoError("Failed to getsockname");
- }
+ socklen_t addrlen = sizeof(addr);
- return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
- }
- default:
- return Error("Unsupported family type: " + stringify(family));
+ if (::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
+ return ErrnoError("Failed to getsockname");
}
+
+ if (addr.s.sa_family == AF_INET) {
+ return Address(addr.v4.sin_addr.s_addr, ntohs(addr.v4.sin_port));
+ }
+
+ return Error("Unsupported IP address family '" +
+ stringify(addr.s.sa_family) + "'");
}
} // namespace network {
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/node.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp
deleted file mode 100644
index 173eb8a..0000000
--- a/3rdparty/libprocess/include/process/node.hpp
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef __PROCESS_NODE_HPP__
-#define __PROCESS_NODE_HPP__
-
-#include <stdint.h>
-#include <unistd.h>
-
-#include <arpa/inet.h>
-
-#include <glog/logging.h>
-
-#include <sstream>
-
-#include <boost/functional/hash.hpp>
-
-namespace process {
-
-// Represents a remote "node" (encapsulates IP address and port).
-class Node
-{
-public:
- Node() : ip(0), port(0) {}
-
- Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
-
- bool operator < (const Node& that) const
- {
- if (ip == that.ip) {
- return port < that.port;
- } else {
- return ip < that.ip;
- }
- }
-
- bool operator == (const Node& that) const
- {
- return (ip == that.ip && port == that.port);
- }
-
- bool operator != (const Node& that) const
- {
- return !(*this == that);
- }
-
- uint32_t ip;
- uint16_t port;
-};
-
-
-inline std::ostream& operator << (std::ostream& stream, const Node& node)
-{
- char ip[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) == NULL) {
- PLOG(FATAL) << "Failed to get human-readable IP address for '"
- << node.ip << "'";
- }
- stream << ip << ":" << node.port;
- return stream;
-}
-
-
-// UPID hash value (for example, to use in Boost's unordered maps).
-inline std::size_t hash_value(const Node& node)
-{
- size_t seed = 0;
- boost::hash_combine(seed, node.ip);
- boost::hash_combine(seed, node.port);
- return seed;
-}
-
-} // namespace process {
-
-#endif // __PROCESS_NODE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 7dccf29..30c466c 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -7,7 +7,7 @@
#include <sstream>
#include <string>
-#include <process/node.hpp>
+#include <process/address.hpp>
namespace process {
@@ -20,19 +20,19 @@ struct UPID
UPID() = default;
UPID(const UPID& that)
- : id(that.id), node(that.node) {}
+ : id(that.id), address(that.address) {}
UPID(const char* id_, uint32_t ip_, uint16_t port_)
- : id(id_), node(ip_, port_) {}
+ : id(id_), address(ip_, port_) {}
- UPID(const char* id_, const Node& node_)
- : id(id_), node(node_) {}
+ UPID(const char* id_, const network::Address& address_)
+ : id(id_), address(address_) {}
UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
- : id(id_), node(ip_, port_) {}
+ : id(id_), address(ip_, port_) {}
- UPID(const std::string& id_, const Node& node_)
- : id(id_), node(node_) {}
+ UPID(const std::string& id_, const network::Address& address_)
+ : id(id_), address(address_) {}
/*implicit*/ UPID(const char* s);
@@ -44,24 +44,26 @@ struct UPID
operator bool () const
{
- return id != "" && node.ip != 0 && node.port != 0;
+ return id != "" && address.ip != 0 && address.port != 0;
}
bool operator ! () const // NOLINT(whitespace/operators)
{
- return id == "" && node.ip == 0 && node.port == 0;
+ return id == "" && address.ip == 0 && address.port == 0;
}
bool operator < (const UPID& that) const
{
- if (node == that.node)
- return id < that.id;
- else return node < that.node;
+ if (address == that.address) {
+ return id < that.id;
+ } else {
+ return address < that.address;
+ }
}
bool operator == (const UPID& that) const
{
- return (id == that.id && node == that.node);
+ return (id == that.id && address == that.address);
}
bool operator != (const UPID& that) const
@@ -70,7 +72,7 @@ struct UPID
}
std::string id;
- Node node;
+ network::Address address;
};
@@ -91,7 +93,7 @@ struct PID : UPID
(void)base; // Eliminate unused base warning.
PID<Base> pid;
pid.id = id;
- pid.node = node;
+ pid.address = address;
return pid;
}
};
@@ -107,6 +109,4 @@ std::size_t hash_value(const UPID&);
} // namespace process {
-
-
#endif // __PROCESS_PID_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 3708f98..392c74d 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -7,6 +7,7 @@
#include <map>
#include <queue>
+#include <process/address.hpp>
#include <process/clock.hpp>
#include <process/event.hpp>
#include <process/filter.hpp>
@@ -276,9 +277,9 @@ void finalize();
/**
- * Returns the node associated with this instance of the library.
+ * Returns the socket address associated with this instance of the library.
*/
-Node node();
+network::Address address();
/**
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 06161f2..426b3fa 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,8 @@
#include <memory>
+#include <process/address.hpp>
#include <process/future.hpp>
-#include <process/node.hpp>
#include <stout/abort.hpp>
#include <stout/nothing.hpp>
@@ -60,10 +60,11 @@ public:
}
// Socket::Impl interface.
- virtual Try<Node> bind(const Node& node);
+ virtual Try<Address> address() const;
+ virtual Try<Address> bind(const Address& address);
virtual Try<Nothing> listen(int backlog) = 0;
virtual Future<Socket> accept() = 0;
- virtual Future<Nothing> connect(const Node& node) = 0;
+ virtual Future<Nothing> connect(const Address& address) = 0;
virtual Future<size_t> recv(char* data, size_t size) = 0;
virtual Future<size_t> send(const char* data, size_t size) = 0;
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
@@ -112,14 +113,19 @@ public:
return impl->get();
}
+ Try<Address> address() const
+ {
+ return impl->address();
+ }
+
int get() const
{
return impl->get();
}
- Try<Node> bind(const Node& node)
+ Try<Address> bind(const Address& address)
{
- return impl->bind(node);
+ return impl->bind(address);
}
Try<Nothing> listen(int backlog)
@@ -132,9 +138,9 @@ public:
return impl->accept();
}
- Future<Nothing> connect(const Node& node)
+ Future<Nothing> connect(const Address& address)
{
- return impl->connect(node);
+ return impl->connect(address);
}
Future<size_t> recv(char* data, size_t size) const
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index c18de81..7503313 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -27,6 +27,7 @@ using std::vector;
using process::http::Request;
using process::http::Response;
+using process::network::Address;
using process::network::Socket;
namespace process {
@@ -112,7 +113,7 @@ Future<Response> decode(const string& buffer)
// Forward declaration.
Future<Response> _request(
Socket socket,
- const Node& node,
+ const Address& address,
const URL& url,
const string& method,
const Option<hashmap<string, string>>& _headers,
@@ -139,10 +140,10 @@ Future<Response> request(
Socket socket = create.get();
- Node node;
+ Address address;
if (url.ip.isSome()) {
- node.ip = url.ip.get().address();
+ address.ip = url.ip.get().address();
} else if (url.domain.isNone()) {
return Failure("Missing URL domain or IP");
} else {
@@ -153,15 +154,15 @@ Future<Response> request(
url.domain.get() + "': " + ip.error());
}
- node.ip = ip.get();
+ address.ip = ip.get();
}
- node.port = url.port;
+ address.port = url.port;
- return socket.connect(node)
+ return socket.connect(address)
.then(lambda::bind(&_request,
socket,
- node,
+ address,
url,
method,
headers,
@@ -172,7 +173,7 @@ Future<Response> request(
Future<Response> _request(
Socket socket,
- const Node& node,
+ const Address& address,
const URL& url,
const string& method,
const Option<hashmap<string, string>>& _headers,
@@ -208,7 +209,7 @@ Future<Response> _request(
}
// Need to specify the 'Host' header.
- headers["Host"] = stringify(node);
+ headers["Host"] = stringify(address);
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
@@ -291,7 +292,7 @@ Future<Response> get(
const Option<string>& query,
const Option<hashmap<string, string>>& headers)
{
- URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
@@ -320,7 +321,7 @@ Future<Response> post(
const Option<string>& body,
const Option<string>& contentType)
{
- URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index 085e0b9..d28a154 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -48,7 +48,7 @@ UPID::UPID(const std::string& s)
UPID::UPID(const ProcessBase& process)
{
id = process.self().id;
- node = process.self().node;
+ address = process.self().address;
}
@@ -62,7 +62,7 @@ UPID::operator std::string() const
ostream& operator << (ostream& stream, const UPID& pid)
{
- stream << pid.id << "@" << pid.node;
+ stream << pid.id << "@" << pid.address;
return stream;
}
@@ -70,8 +70,8 @@ ostream& operator << (ostream& stream, const UPID& pid)
istream& operator >> (istream& stream, UPID& pid)
{
pid.id = "";
- pid.node.ip = 0;
- pid.node.port = 0;
+ pid.address.ip = 0;
+ pid.address.port = 0;
string str;
if (!(stream >> str)) {
@@ -88,7 +88,7 @@ istream& operator >> (istream& stream, UPID& pid)
string id;
string host;
- Node node;
+ network::Address address;
size_t index = str.find('@');
@@ -119,17 +119,17 @@ istream& operator >> (istream& stream, UPID& pid)
return stream;
}
- node.ip = ip.get();
+ address.ip = ip.get();
str = str.substr(index + 1);
- if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
+ if (sscanf(str.c_str(), "%hu", &address.port) != 1) {
stream.setstate(std::ios_base::badbit);
return stream;
}
pid.id = id;
- pid.node = node;
+ pid.address = address;
return stream;
}
@@ -139,8 +139,8 @@ size_t hash_value(const UPID& pid)
{
size_t seed = 0;
boost::hash_combine(seed, pid.id);
- boost::hash_combine(seed, pid.node.ip);
- boost::hash_combine(seed, pid.node.port);
+ boost::hash_combine(seed, pid.address.ip);
+ boost::hash_combine(seed, pid.address.port);
return seed;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index eaeabd7..ec781a3 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -101,9 +101,9 @@ Future<Nothing> connect(const Socket& socket)
} // namespace internal {
-Future<Nothing> PollSocketImpl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Address& address)
{
- Try<int> connect = network::connect(get(), node);
+ Try<int> connect = network::connect(get(), address);
if (connect.isError()) {
if (errno == EINPROGRESS) {
return io::poll(get(), io::WRITE)
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index f7ca08e..553aa64 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -19,7 +19,7 @@ public:
// Implementation of the Socket::Impl interface.
virtual Try<Nothing> listen(int backlog);
virtual Future<Socket> accept();
- virtual Future<Nothing> connect(const Node& node);
+ virtual Future<Nothing> connect(const Address& address);
virtual Future<size_t> recv(char* data, size_t size);
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 67b6b3b..a1d9121 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -40,6 +40,7 @@
#include <stdexcept>
#include <vector>
+#include <process/address.hpp>
#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
@@ -54,7 +55,6 @@
#include <process/io.hpp>
#include <process/logging.hpp>
#include <process/mime.hpp>
-#include <process/node.hpp>
#include <process/process.hpp>
#include <process/profiler.hpp>
#include <process/socket.hpp>
@@ -96,6 +96,8 @@ using process::http::OK;
using process::http::Request;
using process::http::Response;
using process::http::ServiceUnavailable;
+
+using process::network::Address;
using process::network::Socket;
using std::deque;
@@ -283,7 +285,7 @@ public:
void close(int s);
- void exited(const Node& node);
+ void exited(const Address& address);
void exited(ProcessBase* process);
private:
@@ -293,11 +295,12 @@ private:
{
// For links, we maintain a bidirectional mapping between the
// "linkers" (Processes) and the "linkees" (remote / local UPIDs).
- // For remote nodes, we also need a mapping to the linkees on the
- // node, because socket closure only notifies at the node level.
+ // For remote socket addresses, we also need a mapping to the
+ // linkees for that socket address, because socket closure only
+ // notifies at the address level.
hashmap<UPID, hashset<ProcessBase*>> linkers;
hashmap<ProcessBase*, hashset<UPID>> linkees;
- hashmap<Node, hashset<UPID>> remotes;
+ hashmap<Address, hashset<UPID>> remotes;
} links;
// Collection of all actice sockets.
@@ -308,19 +311,19 @@ private:
// them).
set<int> dispose;
- // Map from socket to node (ip, port).
- map<int, Node> nodes;
+ // Map from socket to socket address (ip, port).
+ map<int, Address> addresses;
- // Maps from node (ip, port) to temporary sockets (i.e., they will
- // get closed once there is no more data to send on them).
- map<Node, int> temps;
+ // Maps from socket address (ip, port) to temporary sockets (i.e.,
+ // they will get closed once there is no more data to send on them).
+ map<Address, int> temps;
- // Maps from node (ip, port) to persistent sockets (i.e., they will
+ // Maps from socket address (ip, port) to persistent sockets (i.e., they will
// remain open even if there is no more data to send on them). We
// distinguish these from the 'temps' collection so we can tell when
// a persistant socket has been lost (and thus generate
// ExitedEvents).
- map<Node, int> persists;
+ map<Address, int> persists;
// Map from socket to outgoing queue.
map<int, queue<Encoder*> > outgoing;
@@ -449,8 +452,8 @@ static const int LISTEN_BACKLOG = 500000;
// Local server socket.
static Socket* __s__ = NULL;
-// Local node.
-static Node __node__;
+// Local socket address.
+static Address __address__;
// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = NULL;
@@ -508,7 +511,7 @@ static Message* encode(const UPID& from,
static void transport(Message* message, ProcessBase* sender = NULL)
{
- if (message->to.node == __node__) {
+ if (message->to.address == __address__) {
// Local message.
process_manager->deliver(message->to, new MessageEvent(message), sender);
} else {
@@ -565,7 +568,7 @@ static Message* parse(Request* request)
return NULL;
}
- const UPID to(decode.get(), __node__);
+ const UPID to(decode.get(), __address__);
// And now determine 'name'.
index = index != string::npos ? index + 2: request->path.size();
@@ -821,15 +824,15 @@ void initialize(const string& delegate)
LOG(FATAL) << "Failed to initialize, pthread_create";
}
- __node__.ip = 0;
- __node__.port = 0;
+ __address__.ip = 0;
+ __address__.port = 0;
char* value;
// Check environment for ip.
value = getenv("LIBPROCESS_IP");
if (value != NULL) {
- int result = inet_pton(AF_INET, value, &__node__.ip);
+ int result = inet_pton(AF_INET, value, &__address__.ip);
if (result == 0) {
LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
} else if (result < 0) {
@@ -844,10 +847,10 @@ void initialize(const string& delegate)
if (result < 0 || result > USHRT_MAX) {
LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
}
- __node__.port = result;
+ __address__.port = result;
}
- // Create a "server" socket for communicating with other nodes.
+ // Create a "server" socket for communicating.
Try<Socket> create = Socket::create();
if (create.isError()) {
PLOG(FATAL) << "Failed to construct server socket:" << create.error();
@@ -860,18 +863,18 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- Try<Node> bind = __s__->bind(__node__);
+ Try<Address> bind = __s__->bind(__address__);
if (bind.isError()) {
PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
- __node__ = bind.get();
+ __address__ = bind.get();
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
// one ip address, so that other processes can send and receive and
// don't get confused as to whom they are sending to.
- if (__node__.ip == 0 || __node__.ip == 2130706433) {
+ if (__address__.ip == 0 || __address__.ip == 2130706433) {
char hostname[512];
if (gethostname(hostname, sizeof(hostname)) < 0) {
@@ -886,7 +889,7 @@ void initialize(const string& delegate)
LOG(FATAL) << ip.error();
}
- __node__.ip = ip.get();
+ __address__.ip = ip.get();
}
Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
@@ -955,7 +958,7 @@ void initialize(const string& delegate)
new Route("/__processes__", None(), __processes__);
- VLOG(1) << "libprocess is initialized on " << node() << " for " << cpus
+ VLOG(1) << "libprocess is initialized on " << address() << " for " << cpus
<< " cpus";
}
@@ -969,10 +972,10 @@ void finalize()
}
-Node node()
+Address address()
{
process::initialize();
- return __node__;
+ return __address__;
}
@@ -1328,8 +1331,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
bool connect = false;
synchronized (this) {
- // Check if node is remote and there isn't a persistant link.
- if (to.node != __node__ && persists.count(to.node) == 0) {
+ // Check if the socket address is remote and there isn't a persistant link.
+ if (to.address != __address__ && persists.count(to.address) == 0) {
// Okay, no link, let's create a socket.
Try<Socket> create = Socket::create();
if (create.isError()) {
@@ -1340,9 +1343,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
int s = socket.get().get();
sockets[s] = new Socket(socket.get());
- nodes[s] = to.node;
+ addresses[s] = to.address;
- persists[to.node] = s;
+ persists[to.address] = s;
// Initialize 'outgoing' to prevent a race with
// SocketManager::send() while the socket is not yet connected.
@@ -1356,14 +1359,14 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
links.linkers[to].insert(process);
links.linkees[process].insert(to);
- if (to.node != __node__) {
- links.remotes[to.node].insert(to);
+ if (to.address != __address__) {
+ links.remotes[to.address].insert(to);
}
}
if (connect) {
CHECK_SOME(socket);
- Socket(socket.get()).connect(to.node) // Copy to drop const.
+ Socket(socket.get()).connect(to.address) // Copy to drop const.
.onAny(lambda::bind(
&internal::link_connect,
lambda::_1,
@@ -1574,17 +1577,17 @@ void SocketManager::send(Message* message)
{
CHECK(message != NULL);
- const Node& node = message->to.node;
+ const Address& address = message->to.address;
Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
// Check if there is already a socket.
- bool persist = persists.count(node) > 0;
- bool temp = temps.count(node) > 0;
+ bool persist = persists.count(address) > 0;
+ bool temp = temps.count(address) > 0;
if (persist || temp) {
- int s = persist ? persists[node] : temps[node];
+ int s = persist ? persists[address] : temps[address];
CHECK(sockets.count(s) > 0);
socket = *sockets[s];
@@ -1603,8 +1606,8 @@ void SocketManager::send(Message* message)
}
} else {
- // No peristent or temporary socket to the node currently
- // exists, so we create a temporary one.
+ // No peristent or temporary socket to the socket address
+ // currently exists, so we create a temporary one.
Try<Socket> create = Socket::create();
if (create.isError()) {
VLOG(1) << "Failed to send, create socket: " << create.error();
@@ -1615,8 +1618,8 @@ void SocketManager::send(Message* message)
int s = socket.get();
sockets[s] = new Socket(socket.get());
- nodes[s] = node;
- temps[node] = s;
+ addresses[s] = address;
+ temps[address] = s;
dispose.insert(s);
@@ -1629,7 +1632,7 @@ void SocketManager::send(Message* message)
if (connect) {
CHECK_SOME(socket);
- Socket(socket.get()).connect(node) // Copy to drop const.
+ Socket(socket.get()).connect(address) // Copy to drop const.
.onAny(lambda::bind(
&internal::send_connect,
lambda::_1,
@@ -1679,11 +1682,11 @@ Encoder* SocketManager::next(int s)
// This is either a temporary socket we created or it's a
// socket that we were receiving data from and possibly
// sending HTTP responses back on. Clean up either way.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
- CHECK(temps.count(node) > 0 && temps[node] == s);
- temps.erase(node);
- nodes.erase(s);
+ if (addresses.count(s) > 0) {
+ const Address& address = addresses[s];
+ CHECK(temps.count(address) > 0 && temps[address] == s);
+ temps.erase(address);
+ addresses.erase(s);
}
if (proxies.count(s) > 0) {
@@ -1739,19 +1742,19 @@ void SocketManager::close(int s)
outgoing.erase(s);
}
- // Clean up after sockets used for node communication.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
+ // Clean up after sockets used for remote communication.
+ if (addresses.count(s) > 0) {
+ const Address& address = addresses[s];
// Don't bother invoking exited unless socket was persistant.
- if (persists.count(node) > 0 && persists[node] == s) {
- persists.erase(node);
- exited(node); // Generate ExitedEvent(s)!
- } else if (temps.count(node) > 0 && temps[node] == s) {
- temps.erase(node);
+ if (persists.count(address) > 0 && persists[address] == s) {
+ persists.erase(address);
+ exited(address); // Generate ExitedEvent(s)!
+ } else if (temps.count(address) > 0 && temps[address] == s) {
+ temps.erase(address);
}
- nodes.erase(s);
+ addresses.erase(s);
}
// Clean up any proxy associated with this socket.
@@ -1803,18 +1806,18 @@ void SocketManager::close(int s)
}
-void SocketManager::exited(const Node& node)
+void SocketManager::exited(const Address& address)
{
// TODO(benh): It would be cleaner if this routine could call back
// into ProcessManager ... then we wouldn't have to convince
// ourselves that the accesses to each Process object will always be
// valid.
synchronized (this) {
- if (!links.remotes.contains(node)) {
- return; // No linkees for this node!
+ if (!links.remotes.contains(address)) {
+ return; // No linkees for this socket address!
}
- foreach (const UPID& linkee, links.remotes[node]) {
+ foreach (const UPID& linkee, links.remotes[address]) {
// Find and notify the linkers.
CHECK(links.linkers.contains(linkee));
@@ -1833,7 +1836,7 @@ void SocketManager::exited(const Node& node)
links.linkers.erase(linkee);
}
- links.remotes.erase(node);
+ links.remotes.erase(address);
}
}
@@ -1865,12 +1868,12 @@ void SocketManager::exited(ProcessBase* process)
// The exited process was the last linker for this linkee,
// so we need to remove the linkee from the remotes.
- if (linkee.node != __node__) {
- CHECK(links.remotes.contains(linkee.node));
+ if (linkee.address != __address__) {
+ CHECK(links.remotes.contains(linkee.address));
- links.remotes[linkee.node].erase(linkee);
- if (links.remotes[linkee.node].empty()) {
- links.remotes.erase(linkee.node);
+ links.remotes[linkee.address].erase(linkee);
+ if (links.remotes[linkee.address].empty()) {
+ links.remotes.erase(linkee.address);
}
}
}
@@ -1932,7 +1935,7 @@ ProcessManager::~ProcessManager()
ProcessReference ProcessManager::use(const UPID& pid)
{
- if (pid.node == __node__) {
+ if (pid.address == __address__) {
synchronized (processes) {
if (processes.count(pid.id) > 0) {
// Note that the ProcessReference constructor _must_ get
@@ -2038,12 +2041,12 @@ bool ProcessManager::handle(
if (tokens.size() == 0 && delegate != "") {
request->path = "/" + delegate;
- receiver = use(UPID(delegate, __node__));
+ receiver = use(UPID(delegate, __address__));
} else if (tokens.size() > 0) {
// Decode possible percent-encoded path.
Try<string> decode = http::decode(tokens[0]);
if (!decode.isError()) {
- receiver = use(UPID(decode.get(), __node__));
+ receiver = use(UPID(decode.get(), __address__));
} else {
VLOG(1) << "Failed to decode URL path: " << decode.error();
}
@@ -2052,7 +2055,7 @@ bool ProcessManager::handle(
if (!receiver && delegate != "") {
// Try and delegate the request.
request->path = "/" + delegate + request->path;
- receiver = use(UPID(delegate, __node__));
+ receiver = use(UPID(delegate, __address__));
}
if (receiver) {
@@ -2363,7 +2366,7 @@ void ProcessManager::cleanup(ProcessBase* process)
void ProcessManager::link(ProcessBase* process, const UPID& to)
{
// Check if the pid is local.
- if (to.node != __node__) {
+ if (to.address != __address__) {
socket_manager->link(process, to);
} else {
// Since the pid is local we want to get a reference to it's
@@ -2665,7 +2668,7 @@ ProcessBase::ProcessBase(const string& id)
refs = 0;
pid.id = id != "" ? id : ID::generate();
- pid.node = __node__;
+ pid.address = __address__;
// If using a manual clock, try and set current time of process
// using happens before relationship between creator (__process__)
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index fb5a5ab..0e1cebb 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -71,15 +71,23 @@ const Socket::Kind& Socket::DEFAULT_KIND()
}
-Try<Node> Socket::Impl::bind(const Node& node)
+Try<Address> Socket::Impl::address() const
{
- Try<int> bind = network::bind(get(), node);
+ // TODO(benh): Cache this result so that we don't have to make
+ // unnecessary system calls each time.
+ return network::address(get());
+}
+
+
+Try<Address> Socket::Impl::bind(const Address& address)
+{
+ Try<int> bind = network::bind(get(), address);
if (bind.isError()) {
return Error(bind.error());
}
// Lookup and store assigned IP and assigned port.
- return network::getsockname(get(), AF_INET);
+ return network::address(get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index b286259..a927e4e 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -108,9 +108,9 @@ public:
private:
void ping(const UPID& from, const string& body)
{
- if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
+ if (linkedPorts.find(from.address.port) == linkedPorts.end()) {
setLink(from);
- linkedPorts.insert(from.node.port);
+ linkedPorts.insert(from.address.port);
}
static const string message("hi");
send(from, "pong", message.c_str(), message.size());
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index aaa1646..0a00ae5 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -123,7 +123,7 @@ TEST(HTTP, Endpoints)
Socket socket = create.get();
- AWAIT_READY(socket.connect(process.self().node));
+ AWAIT_READY(socket.connect(process.self().address));
std::ostringstream out;
out << "GET /" << process.self().id << "/body"
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index f9b2dc0..8ce1e6e 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
@@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
@@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
// Ensures that the aggregate statistics are correct in the snapshot.
TEST(Metrics, SnapshotStatistics)
{
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 1bcb3c6..9f9220e 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -41,6 +41,7 @@
using namespace process;
+using process::network::Address;
using process::network::Socket;
using std::string;
@@ -1468,7 +1469,7 @@ TEST(Process, remote)
Socket socket = create.get();
- AWAIT_READY(socket.connect(process.self().node));
+ AWAIT_READY(socket.connect(process.self().address));
Message message;
message.name = "handler";
@@ -1531,14 +1532,14 @@ TEST(Process, http2)
Socket socket = create.get();
- ASSERT_SOME(socket.bind(Node()));
+ ASSERT_SOME(socket.bind(Address()));
// Create a UPID for 'Libprocess-From' based on the IP and port we
// got assigned.
- Try<Node> node = network::getsockname(socket.get(), AF_INET);
- ASSERT_SOME(node);
+ Try<Address> address = socket.address();
+ ASSERT_SOME(address);
- UPID from("", node.get());
+ UPID from("", address.get());
ASSERT_SOME(socket.listen(1));
@@ -1869,7 +1870,7 @@ TEST(Process, PercentEncodedURLs)
spawn(process);
// Construct the PID using percent-encoding.
- UPID pid("id%2842%29", process.self().node);
+ UPID pid("id%2842%29", process.self().address);
// Mimic a libprocess message sent to an installed handler.
Future<Nothing> handler1;