You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by dm...@apache.org on 2014/11/12 22:05:38 UTC
[1/2] mesos git commit: libprocess: Replaced the ip and port pairs
from UPID class and process namespace with Node class.
Repository: mesos
Updated Branches:
refs/heads/master 3f693f23a -> 76bfb4930
libprocess: Replaced the ip and port pairs from UPID class and process namespace with Node class.
At the moment, the Node class is used to keep a mapping from a socket to the ip & port pair in the process namespace.
I want to propose to extend its use by replacing the ip & port fields from the UPID class and process namespace with this type.
Review: https://reviews.apache.org/r/27446
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa
Branch: refs/heads/master
Commit: f64562fa66a272b695560971d0e548d131f42682
Parents: 3f693f2
Author: Evelina Dumitrescu <ev...@gmail.com>
Authored: Wed Nov 12 12:56:23 2014 -0800
Committer: Dominic Hamon <dh...@twitter.com>
Committed: Wed Nov 12 12:56:47 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/node.hpp | 28 ++++++-
3rdparty/libprocess/include/process/pid.hpp | 49 +++++------
3rdparty/libprocess/include/process/process.hpp | 11 +--
3rdparty/libprocess/src/http.cpp | 13 +--
3rdparty/libprocess/src/pid.cpp | 28 +++----
3rdparty/libprocess/src/process.cpp | 88 ++++++++------------
3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
3rdparty/libprocess/src/tests/process_tests.cpp | 6 +-
10 files changed, 103 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp
index 7a96894..24132a5 100644
--- a/3rdparty/libprocess/include/process/node.hpp
+++ b/3rdparty/libprocess/include/process/node.hpp
@@ -1,17 +1,22 @@
#ifndef __PROCESS_NODE_HPP__
#define __PROCESS_NODE_HPP__
+#include <arpa/inet.h>
#include <unistd.h>
#include <sstream>
+#include <glog/logging.h>
+
namespace process {
// Represents a remote "node" (encapsulates IP address and port).
class Node
{
public:
- Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {}
+ Node() : ip(0), port(0) {}
+
+ Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
bool operator < (const Node& that) const
{
@@ -22,16 +27,31 @@ public:
}
}
- std::ostream& operator << (std::ostream& stream) const
+ bool operator == (const Node& that) const
+ {
+ return (ip == that.ip && port == that.port);
+ }
+
+ bool operator != (const Node& that) const
{
- stream << ip << ":" << port;
- return stream;
+ return !(*this == that);
}
uint32_t ip;
uint16_t port;
};
+inline std::ostream& operator << (std::ostream & stream, const Node & node)
+{
+ char ip[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) == NULL) {
+ PLOG(FATAL) << "Failed to get human-readable IP address for '"
+ << node.ip << "'";
+ }
+ stream << ip << ":" << node.port;
+ return stream;
+}
+
} // namespace process {
#endif // __PROCESS_NODE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 2345322..7dccf29 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -7,6 +7,7 @@
#include <sstream>
#include <string>
+#include <process/node.hpp>
namespace process {
@@ -16,17 +17,22 @@ class ProcessBase;
struct UPID
{
- UPID()
- : ip(0), port(0) {}
+ UPID() = default;
UPID(const UPID& that)
- : id(that.id), ip(that.ip), port(that.port) {}
+ : id(that.id), node(that.node) {}
UPID(const char* id_, uint32_t ip_, uint16_t port_)
- : id(id_), ip(ip_), port(port_) {}
+ : id(id_), node(ip_, port_) {}
+
+ UPID(const char* id_, const Node& node_)
+ : id(id_), node(node_) {}
UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
- : id(id_), ip(ip_), port(port_) {}
+ : id(id_), node(ip_, port_) {}
+
+ UPID(const std::string& id_, const Node& node_)
+ : id(id_), node(node_) {}
/*implicit*/ UPID(const char* s);
@@ -38,47 +44,33 @@ struct UPID
operator bool () const
{
- return id != "" && ip != 0 && port != 0;
+ return id != "" && node.ip != 0 && node.port != 0;
}
bool operator ! () const // NOLINT(whitespace/operators)
{
- return id == "" && ip == 0 && port == 0;
+ return id == "" && node.ip == 0 && node.port == 0;
}
bool operator < (const UPID& that) const
{
- if (this != &that) {
- if (ip == that.ip && port == that.port)
- return id < that.id;
- else if (ip == that.ip && port != that.port)
- return port < that.port;
- else
- return ip < that.ip;
- }
-
- return false;
+ if (node == that.node)
+ return id < that.id;
+ else return node < that.node;
}
bool operator == (const UPID& that) const
{
- if (this != &that) {
- return (id == that.id &&
- ip == that.ip &&
- port == that.port);
- }
-
- return true;
+ return (id == that.id && node == that.node);
}
bool operator != (const UPID& that) const
{
- return !(this->operator == (that));
+ return !(*this == that);
}
std::string id;
- uint32_t ip;
- uint16_t port;
+ Node node;
};
@@ -99,8 +91,7 @@ struct PID : UPID
(void)base; // Eliminate unused base warning.
PID<Base> pid;
pid.id = id;
- pid.ip = ip;
- pid.port = port;
+ pid.node = node;
return pid;
}
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 81a1f7a..cb3e0a6 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -276,16 +276,9 @@ void finalize();
/**
- * Returns the IP address associated with this instance of the
- * library.
+ * Returns the node associated with this instance of the library.
*/
-uint32_t ip();
-
-
-/**
- * Returns the port associated with this instance of the library.
- */
-uint16_t port();
+Node node();
/**
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 4ef00d1..b00f333 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -79,20 +79,13 @@ Future<Response> request(
return Failure("Failed to cloexec: " + cloexec.error());
}
- // We use inet_ntop since inet_ntoa is not thread-safe!
- char ip[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) == NULL) {
- return Failure(ErrnoError("Failed to get human-readable IP address for '" +
- stringify(upid.ip) + "'"));
- }
-
- const string host = string(ip) + ":" + stringify(upid.port);
+ const string host = stringify(upid.node);
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
- addr.sin_port = htons(upid.port);
- addr.sin_addr.s_addr = upid.ip;
+ addr.sin_port = htons(upid.node.port);
+ addr.sin_addr.s_addr = upid.node.ip;
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
os::close(s);
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index 20ff25c..a2c620e 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -47,8 +47,7 @@ UPID::UPID(const std::string& s)
UPID::UPID(const ProcessBase& process)
{
id = process.self().id;
- ip = process.self().ip;
- port = process.self().port;
+ node = process.self().node;
}
@@ -62,12 +61,7 @@ UPID::operator std::string() const
ostream& operator << (ostream& stream, const UPID& pid)
{
- // Call inet_ntop since inet_ntoa is not thread-safe!
- char ip[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) == NULL)
- memset(ip, 0, INET_ADDRSTRLEN);
-
- stream << pid.id << "@" << ip << ":" << pid.port;
+ stream << pid.id << "@" << pid.node;
return stream;
}
@@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID& pid)
istream& operator >> (istream& stream, UPID& pid)
{
pid.id = "";
- pid.ip = 0;
- pid.port = 0;
+ pid.node.ip = 0;
+ pid.node.port = 0;
string str;
if (!(stream >> str)) {
@@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid)
string id;
string host;
- uint32_t ip;
- uint16_t port;
+ Node node;
size_t index = str.find('@');
@@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid)
return stream;
}
- ip = *((uint32_t*) hep->h_addr_list[0]);
+ node.ip = *((uint32_t*) hep->h_addr_list[0]);
delete[] temp;
str = str.substr(index + 1);
- if (sscanf(str.c_str(), "%hu", &port) != 1) {
+ if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
stream.setstate(std::ios_base::badbit);
return stream;
}
pid.id = id;
- pid.ip = ip;
- pid.port = port;
+ pid.node = node;
return stream;
}
@@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid)
{
size_t seed = 0;
boost::hash_combine(seed, pid.id);
- boost::hash_combine(seed, pid.ip);
- boost::hash_combine(seed, pid.port);
+ boost::hash_combine(seed, pid.node.ip);
+ boost::hash_combine(seed, pid.node.port);
return seed;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 85fb995..a34b870 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -436,11 +436,8 @@ static uint32_t __id__ = 0;
// Local server socket.
static int __s__ = -1;
-// Local IP address.
-static uint32_t __ip__ = 0;
-
-// Local port.
-static uint16_t __port__ = 0;
+// Local node.
+static Node __node__;
// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = NULL;
@@ -709,7 +706,7 @@ static Message* encode(const UPID& from,
static void transport(Message* message, ProcessBase* sender = NULL)
{
- if (message->to.ip == __ip__ && message->to.port == __port__) {
+ if (message->to.node == __node__) {
// Local message.
process_manager->deliver(message->to, new MessageEvent(message), sender);
} else {
@@ -766,7 +763,7 @@ static Message* parse(Request* request)
return NULL;
}
- const UPID to(decode.get(), __ip__, __port__);
+ const UPID to(decode.get(), __node__);
// And now determine 'name'.
index = index != string::npos ? index + 2: request->path.size();
@@ -1472,15 +1469,15 @@ void initialize(const string& delegate)
}
}
- __ip__ = 0;
- __port__ = 0;
+ __node__.ip = 0;
+ __node__.port = 0;
char* value;
// Check environment for ip.
value = getenv("LIBPROCESS_IP");
if (value != NULL) {
- int result = inet_pton(AF_INET, value, &__ip__);
+ int result = inet_pton(AF_INET, value, &__node__.ip);
if (result == 0) {
LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
} else if (result < 0) {
@@ -1495,7 +1492,7 @@ void initialize(const string& delegate)
if (result < 0 || result > USHRT_MAX) {
LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
}
- __port__ = result;
+ __node__.port = result;
}
// Create a "server" socket for communicating with other nodes.
@@ -1525,12 +1522,11 @@ void initialize(const string& delegate)
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = __ip__;
- addr.sin_port = htons(__port__);
+ addr.sin_addr.s_addr = __node__.ip;
+ addr.sin_port = htons(__node__.port);
if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
- PLOG(FATAL) << "Failed to initialize, bind "
- << inet_ntoa(addr.sin_addr) << ":" << __port__;
+ PLOG(FATAL) << "Failed to initialize, bind " << __node__;
}
// Lookup and store assigned ip and assigned port.
@@ -1539,14 +1535,14 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, getsockname";
}
- __ip__ = addr.sin_addr.s_addr;
- __port__ = ntohs(addr.sin_port);
+ __node__.ip = addr.sin_addr.s_addr;
+ __node__.port = ntohs(addr.sin_port);
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
// one ip address, so that other processes can send and receive and
// don't get confused as to whom they are sending to.
- if (__ip__ == 0 || __ip__ == 2130706433) {
+ if (__node__.ip == 0 || __node__.ip == 2130706433) {
char hostname[512];
if (gethostname(hostname, sizeof(hostname)) < 0) {
@@ -1562,7 +1558,7 @@ void initialize(const string& delegate)
<< hstrerror(h_errno);
}
- __ip__ = *((uint32_t *) he->h_addr_list[0]);
+ __node__.ip = *((uint32_t *) he->h_addr_list[0]);
}
if (listen(__s__, 500000) < 0) {
@@ -1663,13 +1659,8 @@ void initialize(const string& delegate)
new Route("/__processes__", None(), __processes__);
- char temp[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
- PLOG(FATAL) << "Failed to initialize, inet_ntop";
- }
-
- VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
- << " for " << cpus << " cpus";
+ VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
+ << " cpus";
}
@@ -1679,17 +1670,10 @@ void finalize()
}
-uint32_t ip()
-{
- process::initialize();
- return __ip__;
-}
-
-
-uint16_t port()
+Node node()
{
process::initialize();
- return __port__;
+ return __node__;
}
@@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
- Node node(to.ip, to.port);
-
synchronized (this) {
// Check if node is remote and there isn't a persistant link.
- if ((node.ip != __ip__ || node.port != __port__)
- && persists.count(node) == 0) {
+ if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
if (socket.isError()) {
@@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
}
sockets[s] = Socket(s);
- nodes[s] = node;
+ nodes[s] = to.node;
- persists[node] = s;
+ persists[to.node] = s;
// Allocate and initialize a watcher for reading data from this
// socket. Note that we don't expect to receive anything other
@@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
- addr.sin_port = htons(to.port);
- addr.sin_addr.s_addr = to.ip;
+ addr.sin_port = htons(to.node.port);
+ addr.sin_addr.s_addr = to.node.ip;
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
@@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message)
{
CHECK(message != NULL);
- Node node(message->to.ip, message->to.port);
+ Node node(message->to.node);
synchronized (this) {
// Check if there is already a socket.
@@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message)
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
- addr.sin_port = htons(message->to.port);
- addr.sin_addr.s_addr = message->to.ip;
+ addr.sin_port = htons(node.port);
+ addr.sin_addr.s_addr = node.ip;
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
@@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node)
list<UPID> removed;
// Look up all linked processes.
foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
- if (linkee.ip == node.ip && linkee.port == node.port) {
+ if (linkee.node == node) {
foreach (ProcessBase* linker, processes) {
linker->enqueue(new ExitedEvent(linkee));
}
@@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager()
ProcessReference ProcessManager::use(const UPID& pid)
{
- if (pid.ip == __ip__ && pid.port == __port__) {
+ if (pid.node == __node__) {
synchronized (processes) {
if (processes.count(pid.id) > 0) {
// Note that the ProcessReference constructor _must_ get
@@ -2567,12 +2548,12 @@ bool ProcessManager::handle(
if (tokens.size() == 0 && delegate != "") {
request->path = "/" + delegate;
- receiver = use(UPID(delegate, __ip__, __port__));
+ receiver = use(UPID(delegate, __node__));
} else if (tokens.size() > 0) {
// Decode possible percent-encoded path.
Try<string> decode = http::decode(tokens[0]);
if (!decode.isError()) {
- receiver = use(UPID(decode.get(), __ip__, __port__));
+ receiver = use(UPID(decode.get(), __node__));
} else {
VLOG(1) << "Failed to decode URL path: " << decode.error();
}
@@ -2581,7 +2562,7 @@ bool ProcessManager::handle(
if (!receiver && delegate != "") {
// Try and delegate the request.
request->path = "/" + delegate + request->path;
- receiver = use(UPID(delegate, __ip__, __port__));
+ receiver = use(UPID(delegate, __node__));
}
if (receiver) {
@@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase* process)
void ProcessManager::link(ProcessBase* process, const UPID& to)
{
// Check if the pid is local.
- if (!(to.ip == __ip__ && to.port == __port__)) {
+ if (to.node != __node__) {
socket_manager->link(process, to);
} else {
// Since the pid is local we want to get a reference to it's
@@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id)
refs = 0;
pid.id = id != "" ? id : ID::generate();
- pid.ip = __ip__;
- pid.port = __port__;
+ pid.node = __node__;
// If using a manual clock, try and set current time of process
// using happens before relationship between creator and createe!
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 3177a8e..227b8e7 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -102,9 +102,9 @@ public:
private:
void ping(const UPID& from, const string& body)
{
- if (linkedPorts.find(from.port) == linkedPorts.end()) {
+ if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
setLink(from);
- linkedPorts.insert(from.port);
+ linkedPorts.insert(from.node.port);
}
static const string message("hi");
send(from, "pong", message.c_str(), message.size());
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index a1c3685..a90e65f 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -120,8 +120,8 @@ TEST(HTTP, Endpoints)
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
- addr.sin_port = htons(process.self().port);
- addr.sin_addr.s_addr = process.self().ip;
+ addr.sin_port = htons(process.self().node.port);
+ addr.sin_addr.s_addr = process.self().node.ip;
ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 33539e4..0c80c69 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::ip(), process::port());
+ UPID upid("metrics", process::node());
Clock::pause();
@@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::ip(), process::port());
+ UPID upid("metrics", process::node());
Clock::pause();
@@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
// Ensures that the aggregate statistics are correct in the snapshot.
TEST(Metrics, SnapshotStatistics)
{
- UPID upid("metrics", process::ip(), process::port());
+ UPID upid("metrics", process::node());
Clock::pause();
http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index b985fb7..902d4d3 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1425,8 +1425,8 @@ TEST(Process, remote)
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
- addr.sin_port = htons(process.self().port);
- addr.sin_addr.s_addr = process.self().ip;
+ addr.sin_port = htons(process.self().node.port);
+ addr.sin_addr.s_addr = process.self().node.ip;
ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
@@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs)
spawn(process);
// Construct the PID using percent-encoding.
- UPID pid("id%2842%29", process.self().ip, process.self().port);
+ UPID pid("id%2842%29", process.self().node);
// Mimic a libprocess message sent to an installed handler.
Future<Nothing> handler1;
Re: [1/2] mesos git commit: libprocess: Replaced the ip and port
pairs from UPID class and process namespace with Node class.
Posted by Dominic Hamon <dh...@twopensource.com>.
Landed. Sorry for the noise.
On Wed, Nov 12, 2014 at 2:17 PM, Dominic Hamon <dh...@twopensource.com>
wrote:
> https://reviews.apache.org/r/27932/
>
> On Wed, Nov 12, 2014 at 2:15 PM, Dominic Hamon <dh...@twopensource.com>
> wrote:
>
>> thanks for the head's up - it didn't show up in my linux clang build.
>>
>> testing a fix now.
>>
>> On Wed, Nov 12, 2014 at 2:12 PM, Cody Maloney <co...@mesosphere.io> wrote:
>>
>>> I'm getting a new warning with this on OS X w/ clang 3.5 which looks like
>>> it probably should be addressed (Add parens to call the node function):
>>>
>>> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49:
>>> warning:
>>> address of function 'process::node' will always evaluate to 'true'
>>> [-Wbool-conversion]
>>> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
>>> ~~~~~~~ ^~~~
>>> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: note:
>>> prefix with the address-of operator to silence this warning
>>> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
>>> ^
>>> &
>>>
>>> On Wed, Nov 12, 2014 at 1:05 PM, <dm...@apache.org> wrote:
>>>
>>> > Repository: mesos
>>> > Updated Branches:
>>> > refs/heads/master 3f693f23a -> 76bfb4930
>>> >
>>> >
>>> > libprocess: Replaced the ip and port pairs from UPID class and process
>>> > namespace with Node class.
>>> >
>>> > At the moment, the Node class is used to keep a mapping from a socket
>>> to
>>> > the ip & port pair in the process namespace.
>>> > I want to propose to extend its use by replacing the ip & port fields
>>> from
>>> > the UPID class and process namespace with this type.
>>> >
>>> > Review: https://reviews.apache.org/r/27446
>>> >
>>> >
>>> > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>>> > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa
>>> > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa
>>> > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa
>>> >
>>> > Branch: refs/heads/master
>>> > Commit: f64562fa66a272b695560971d0e548d131f42682
>>> > Parents: 3f693f2
>>> > Author: Evelina Dumitrescu <ev...@gmail.com>
>>> > Authored: Wed Nov 12 12:56:23 2014 -0800
>>> > Committer: Dominic Hamon <dh...@twitter.com>
>>> > Committed: Wed Nov 12 12:56:47 2014 -0800
>>> >
>>> > ----------------------------------------------------------------------
>>> > 3rdparty/libprocess/include/process/node.hpp | 28 ++++++-
>>> > 3rdparty/libprocess/include/process/pid.hpp | 49 +++++------
>>> > 3rdparty/libprocess/include/process/process.hpp | 11 +--
>>> > 3rdparty/libprocess/src/http.cpp | 13 +--
>>> > 3rdparty/libprocess/src/pid.cpp | 28 +++----
>>> > 3rdparty/libprocess/src/process.cpp | 88
>>> ++++++++------------
>>> > 3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
>>> > 3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
>>> > 3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
>>> > 3rdparty/libprocess/src/tests/process_tests.cpp | 6 +-
>>> > 10 files changed, 103 insertions(+), 134 deletions(-)
>>> > ----------------------------------------------------------------------
>>> >
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/include/process/node.hpp
>>> > b/3rdparty/libprocess/include/process/node.hpp
>>> > index 7a96894..24132a5 100644
>>> > --- a/3rdparty/libprocess/include/process/node.hpp
>>> > +++ b/3rdparty/libprocess/include/process/node.hpp
>>> > @@ -1,17 +1,22 @@
>>> > #ifndef __PROCESS_NODE_HPP__
>>> > #define __PROCESS_NODE_HPP__
>>> >
>>> > +#include <arpa/inet.h>
>>> > #include <unistd.h>
>>> >
>>> > #include <sstream>
>>> >
>>> > +#include <glog/logging.h>
>>> > +
>>> > namespace process {
>>> >
>>> > // Represents a remote "node" (encapsulates IP address and port).
>>> > class Node
>>> > {
>>> > public:
>>> > - Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {}
>>> > + Node() : ip(0), port(0) {}
>>> > +
>>> > + Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
>>> >
>>> > bool operator < (const Node& that) const
>>> > {
>>> > @@ -22,16 +27,31 @@ public:
>>> > }
>>> > }
>>> >
>>> > - std::ostream& operator << (std::ostream& stream) const
>>> > + bool operator == (const Node& that) const
>>> > + {
>>> > + return (ip == that.ip && port == that.port);
>>> > + }
>>> > +
>>> > + bool operator != (const Node& that) const
>>> > {
>>> > - stream << ip << ":" << port;
>>> > - return stream;
>>> > + return !(*this == that);
>>> > }
>>> >
>>> > uint32_t ip;
>>> > uint16_t port;
>>> > };
>>> >
>>> > +inline std::ostream& operator << (std::ostream & stream, const Node &
>>> > node)
>>> > +{
>>> > + char ip[INET_ADDRSTRLEN];
>>> > + if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) ==
>>> > NULL) {
>>> > + PLOG(FATAL) << "Failed to get human-readable IP address for '"
>>> > + << node.ip << "'";
>>> > + }
>>> > + stream << ip << ":" << node.port;
>>> > + return stream;
>>> > +}
>>> > +
>>> > } // namespace process {
>>> >
>>> > #endif // __PROCESS_NODE_HPP__
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/include/process/pid.hpp
>>> > b/3rdparty/libprocess/include/process/pid.hpp
>>> > index 2345322..7dccf29 100644
>>> > --- a/3rdparty/libprocess/include/process/pid.hpp
>>> > +++ b/3rdparty/libprocess/include/process/pid.hpp
>>> > @@ -7,6 +7,7 @@
>>> > #include <sstream>
>>> > #include <string>
>>> >
>>> > +#include <process/node.hpp>
>>> >
>>> > namespace process {
>>> >
>>> > @@ -16,17 +17,22 @@ class ProcessBase;
>>> >
>>> > struct UPID
>>> > {
>>> > - UPID()
>>> > - : ip(0), port(0) {}
>>> > + UPID() = default;
>>> >
>>> > UPID(const UPID& that)
>>> > - : id(that.id), ip(that.ip), port(that.port) {}
>>> > + : id(that.id), node(that.node) {}
>>> >
>>> > UPID(const char* id_, uint32_t ip_, uint16_t port_)
>>> > - : id(id_), ip(ip_), port(port_) {}
>>> > + : id(id_), node(ip_, port_) {}
>>> > +
>>> > + UPID(const char* id_, const Node& node_)
>>> > + : id(id_), node(node_) {}
>>> >
>>> > UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
>>> > - : id(id_), ip(ip_), port(port_) {}
>>> > + : id(id_), node(ip_, port_) {}
>>> > +
>>> > + UPID(const std::string& id_, const Node& node_)
>>> > + : id(id_), node(node_) {}
>>> >
>>> > /*implicit*/ UPID(const char* s);
>>> >
>>> > @@ -38,47 +44,33 @@ struct UPID
>>> >
>>> > operator bool () const
>>> > {
>>> > - return id != "" && ip != 0 && port != 0;
>>> > + return id != "" && node.ip != 0 && node.port != 0;
>>> > }
>>> >
>>> > bool operator ! () const // NOLINT(whitespace/operators)
>>> > {
>>> > - return id == "" && ip == 0 && port == 0;
>>> > + return id == "" && node.ip == 0 && node.port == 0;
>>> > }
>>> >
>>> > bool operator < (const UPID& that) const
>>> > {
>>> > - if (this != &that) {
>>> > - if (ip == that.ip && port == that.port)
>>> > - return id < that.id;
>>> > - else if (ip == that.ip && port != that.port)
>>> > - return port < that.port;
>>> > - else
>>> > - return ip < that.ip;
>>> > - }
>>> > -
>>> > - return false;
>>> > + if (node == that.node)
>>> > + return id < that.id;
>>> > + else return node < that.node;
>>> > }
>>> >
>>> > bool operator == (const UPID& that) const
>>> > {
>>> > - if (this != &that) {
>>> > - return (id == that.id &&
>>> > - ip == that.ip &&
>>> > - port == that.port);
>>> > - }
>>> > -
>>> > - return true;
>>> > + return (id == that.id && node == that.node);
>>> > }
>>> >
>>> > bool operator != (const UPID& that) const
>>> > {
>>> > - return !(this->operator == (that));
>>> > + return !(*this == that);
>>> > }
>>> >
>>> > std::string id;
>>> > - uint32_t ip;
>>> > - uint16_t port;
>>> > + Node node;
>>> > };
>>> >
>>> >
>>> > @@ -99,8 +91,7 @@ struct PID : UPID
>>> > (void)base; // Eliminate unused base warning.
>>> > PID<Base> pid;
>>> > pid.id = id;
>>> > - pid.ip = ip;
>>> > - pid.port = port;
>>> > + pid.node = node;
>>> > return pid;
>>> > }
>>> > };
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/include/process/process.hpp
>>> > b/3rdparty/libprocess/include/process/process.hpp
>>> > index 81a1f7a..cb3e0a6 100644
>>> > --- a/3rdparty/libprocess/include/process/process.hpp
>>> > +++ b/3rdparty/libprocess/include/process/process.hpp
>>> > @@ -276,16 +276,9 @@ void finalize();
>>> >
>>> >
>>> > /**
>>> > - * Returns the IP address associated with this instance of the
>>> > - * library.
>>> > + * Returns the node associated with this instance of the library.
>>> > */
>>> > -uint32_t ip();
>>> > -
>>> > -
>>> > -/**
>>> > - * Returns the port associated with this instance of the library.
>>> > - */
>>> > -uint16_t port();
>>> > +Node node();
>>> >
>>> >
>>> > /**
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/http.cpp
>>> > b/3rdparty/libprocess/src/http.cpp
>>> > index 4ef00d1..b00f333 100644
>>> > --- a/3rdparty/libprocess/src/http.cpp
>>> > +++ b/3rdparty/libprocess/src/http.cpp
>>> > @@ -79,20 +79,13 @@ Future<Response> request(
>>> > return Failure("Failed to cloexec: " + cloexec.error());
>>> > }
>>> >
>>> > - // We use inet_ntop since inet_ntoa is not thread-safe!
>>> > - char ip[INET_ADDRSTRLEN];
>>> > - if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) ==
>>> > NULL) {
>>> > - return Failure(ErrnoError("Failed to get human-readable IP address
>>> > for '" +
>>> > - stringify(upid.ip) + "'"));
>>> > - }
>>> > -
>>> > - const string host = string(ip) + ":" + stringify(upid.port);
>>> > + const string host = stringify(upid.node);
>>> >
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = AF_INET;
>>> > - addr.sin_port = htons(upid.port);
>>> > - addr.sin_addr.s_addr = upid.ip;
>>> > + addr.sin_port = htons(upid.node.port);
>>> > + addr.sin_addr.s_addr = upid.node.ip;
>>> >
>>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>>> > os::close(s);
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/pid.cpp
>>> > b/3rdparty/libprocess/src/pid.cpp
>>> > index 20ff25c..a2c620e 100644
>>> > --- a/3rdparty/libprocess/src/pid.cpp
>>> > +++ b/3rdparty/libprocess/src/pid.cpp
>>> > @@ -47,8 +47,7 @@ UPID::UPID(const std::string& s)
>>> > UPID::UPID(const ProcessBase& process)
>>> > {
>>> > id = process.self().id;
>>> > - ip = process.self().ip;
>>> > - port = process.self().port;
>>> > + node = process.self().node;
>>> > }
>>> >
>>> >
>>> > @@ -62,12 +61,7 @@ UPID::operator std::string() const
>>> >
>>> > ostream& operator << (ostream& stream, const UPID& pid)
>>> > {
>>> > - // Call inet_ntop since inet_ntoa is not thread-safe!
>>> > - char ip[INET_ADDRSTRLEN];
>>> > - if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) ==
>>> > NULL)
>>> > - memset(ip, 0, INET_ADDRSTRLEN);
>>> > -
>>> > - stream << pid.id << "@" << ip << ":" << pid.port;
>>> > + stream << pid.id << "@" << pid.node;
>>> > return stream;
>>> > }
>>> >
>>> > @@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID&
>>> pid)
>>> > istream& operator >> (istream& stream, UPID& pid)
>>> > {
>>> > pid.id = "";
>>> > - pid.ip = 0;
>>> > - pid.port = 0;
>>> > + pid.node.ip = 0;
>>> > + pid.node.port = 0;
>>> >
>>> > string str;
>>> > if (!(stream >> str)) {
>>> > @@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid)
>>> >
>>> > string id;
>>> > string host;
>>> > - uint32_t ip;
>>> > - uint16_t port;
>>> > + Node node;
>>> >
>>> > size_t index = str.find('@');
>>> >
>>> > @@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid)
>>> > return stream;
>>> > }
>>> >
>>> > - ip = *((uint32_t*) hep->h_addr_list[0]);
>>> > + node.ip = *((uint32_t*) hep->h_addr_list[0]);
>>> >
>>> > delete[] temp;
>>> >
>>> > str = str.substr(index + 1);
>>> >
>>> > - if (sscanf(str.c_str(), "%hu", &port) != 1) {
>>> > + if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
>>> > stream.setstate(std::ios_base::badbit);
>>> > return stream;
>>> > }
>>> >
>>> > pid.id = id;
>>> > - pid.ip = ip;
>>> > - pid.port = port;
>>> > + pid.node = node;
>>> >
>>> > return stream;
>>> > }
>>> > @@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid)
>>> > {
>>> > size_t seed = 0;
>>> > boost::hash_combine(seed, pid.id);
>>> > - boost::hash_combine(seed, pid.ip);
>>> > - boost::hash_combine(seed, pid.port);
>>> > + boost::hash_combine(seed, pid.node.ip);
>>> > + boost::hash_combine(seed, pid.node.port);
>>> > return seed;
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/process.cpp
>>> > b/3rdparty/libprocess/src/process.cpp
>>> > index 85fb995..a34b870 100644
>>> > --- a/3rdparty/libprocess/src/process.cpp
>>> > +++ b/3rdparty/libprocess/src/process.cpp
>>> > @@ -436,11 +436,8 @@ static uint32_t __id__ = 0;
>>> > // Local server socket.
>>> > static int __s__ = -1;
>>> >
>>> > -// Local IP address.
>>> > -static uint32_t __ip__ = 0;
>>> > -
>>> > -// Local port.
>>> > -static uint16_t __port__ = 0;
>>> > +// Local node.
>>> > +static Node __node__;
>>> >
>>> > // Active SocketManager (eventually will probably be thread-local).
>>> > static SocketManager* socket_manager = NULL;
>>> > @@ -709,7 +706,7 @@ static Message* encode(const UPID& from,
>>> >
>>> > static void transport(Message* message, ProcessBase* sender = NULL)
>>> > {
>>> > - if (message->to.ip == __ip__ && message->to.port == __port__) {
>>> > + if (message->to.node == __node__) {
>>> > // Local message.
>>> > process_manager->deliver(message->to, new MessageEvent(message),
>>> > sender);
>>> > } else {
>>> > @@ -766,7 +763,7 @@ static Message* parse(Request* request)
>>> > return NULL;
>>> > }
>>> >
>>> > - const UPID to(decode.get(), __ip__, __port__);
>>> > + const UPID to(decode.get(), __node__);
>>> >
>>> > // And now determine 'name'.
>>> > index = index != string::npos ? index + 2: request->path.size();
>>> > @@ -1472,15 +1469,15 @@ void initialize(const string& delegate)
>>> > }
>>> > }
>>> >
>>> > - __ip__ = 0;
>>> > - __port__ = 0;
>>> > + __node__.ip = 0;
>>> > + __node__.port = 0;
>>> >
>>> > char* value;
>>> >
>>> > // Check environment for ip.
>>> > value = getenv("LIBPROCESS_IP");
>>> > if (value != NULL) {
>>> > - int result = inet_pton(AF_INET, value, &__ip__);
>>> > + int result = inet_pton(AF_INET, value, &__node__.ip);
>>> > if (result == 0) {
>>> > LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
>>> > } else if (result < 0) {
>>> > @@ -1495,7 +1492,7 @@ void initialize(const string& delegate)
>>> > if (result < 0 || result > USHRT_MAX) {
>>> > LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid
>>> port";
>>> > }
>>> > - __port__ = result;
>>> > + __node__.port = result;
>>> > }
>>> >
>>> > // Create a "server" socket for communicating with other nodes.
>>> > @@ -1525,12 +1522,11 @@ void initialize(const string& delegate)
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = PF_INET;
>>> > - addr.sin_addr.s_addr = __ip__;
>>> > - addr.sin_port = htons(__port__);
>>> > + addr.sin_addr.s_addr = __node__.ip;
>>> > + addr.sin_port = htons(__node__.port);
>>> >
>>> > if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
>>> > - PLOG(FATAL) << "Failed to initialize, bind "
>>> > - << inet_ntoa(addr.sin_addr) << ":" << __port__;
>>> > + PLOG(FATAL) << "Failed to initialize, bind " << __node__;
>>> > }
>>> >
>>> > // Lookup and store assigned ip and assigned port.
>>> > @@ -1539,14 +1535,14 @@ void initialize(const string& delegate)
>>> > PLOG(FATAL) << "Failed to initialize, getsockname";
>>> > }
>>> >
>>> > - __ip__ = addr.sin_addr.s_addr;
>>> > - __port__ = ntohs(addr.sin_port);
>>> > + __node__.ip = addr.sin_addr.s_addr;
>>> > + __node__.port = ntohs(addr.sin_port);
>>> >
>>> > // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
>>> > // actually have a valid external ip address. Note that we need only
>>> > // one ip address, so that other processes can send and receive and
>>> > // don't get confused as to whom they are sending to.
>>> > - if (__ip__ == 0 || __ip__ == 2130706433) {
>>> > + if (__node__.ip == 0 || __node__.ip == 2130706433) {
>>> > char hostname[512];
>>> >
>>> > if (gethostname(hostname, sizeof(hostname)) < 0) {
>>> > @@ -1562,7 +1558,7 @@ void initialize(const string& delegate)
>>> > << hstrerror(h_errno);
>>> > }
>>> >
>>> > - __ip__ = *((uint32_t *) he->h_addr_list[0]);
>>> > + __node__.ip = *((uint32_t *) he->h_addr_list[0]);
>>> > }
>>> >
>>> > if (listen(__s__, 500000) < 0) {
>>> > @@ -1663,13 +1659,8 @@ void initialize(const string& delegate)
>>> >
>>> > new Route("/__processes__", None(), __processes__);
>>> >
>>> > - char temp[INET_ADDRSTRLEN];
>>> > - if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) ==
>>> > NULL) {
>>> > - PLOG(FATAL) << "Failed to initialize, inet_ntop";
>>> > - }
>>> > -
>>> > - VLOG(1) << "libprocess is initialized on " << temp << ":" <<
>>> __port__
>>> > - << " for " << cpus << " cpus";
>>> > + VLOG(1) << "libprocess is initialized on " << node << " for " <<
>>> cpus
>>> > + << " cpus";
>>> > }
>>> >
>>> >
>>> > @@ -1679,17 +1670,10 @@ void finalize()
>>> > }
>>> >
>>> >
>>> > -uint32_t ip()
>>> > -{
>>> > - process::initialize();
>>> > - return __ip__;
>>> > -}
>>> > -
>>> > -
>>> > -uint16_t port()
>>> > +Node node()
>>> > {
>>> > process::initialize();
>>> > - return __port__;
>>> > + return __node__;
>>> > }
>>> >
>>> >
>>> > @@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process,
>>> > const UPID& to)
>>> >
>>> > CHECK(process != NULL);
>>> >
>>> > - Node node(to.ip, to.port);
>>> > -
>>> > synchronized (this) {
>>> > // Check if node is remote and there isn't a persistant link.
>>> > - if ((node.ip != __ip__ || node.port != __port__)
>>> > - && persists.count(node) == 0) {
>>> > + if (to.node != __node__ && persists.count(to.node) == 0) {
>>> > // Okay, no link, let's create a socket.
>>> > Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
>>> > if (socket.isError()) {
>>> > @@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process,
>>> const
>>> > UPID& to)
>>> > }
>>> >
>>> > sockets[s] = Socket(s);
>>> > - nodes[s] = node;
>>> > + nodes[s] = to.node;
>>> >
>>> > - persists[node] = s;
>>> > + persists[to.node] = s;
>>> >
>>> > // Allocate and initialize a watcher for reading data from this
>>> > // socket. Note that we don't expect to receive anything other
>>> > @@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process,
>>> const
>>> > UPID& to)
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = PF_INET;
>>> > - addr.sin_port = htons(to.port);
>>> > - addr.sin_addr.s_addr = to.ip;
>>> > + addr.sin_port = htons(to.node.port);
>>> > + addr.sin_addr.s_addr = to.node.ip;
>>> >
>>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>>> > if (errno != EINPROGRESS) {
>>> > @@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message)
>>> > {
>>> > CHECK(message != NULL);
>>> >
>>> > - Node node(message->to.ip, message->to.port);
>>> > + Node node(message->to.node);
>>> >
>>> > synchronized (this) {
>>> > // Check if there is already a socket.
>>> > @@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message)
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = PF_INET;
>>> > - addr.sin_port = htons(message->to.port);
>>> > - addr.sin_addr.s_addr = message->to.ip;
>>> > + addr.sin_port = htons(node.port);
>>> > + addr.sin_addr.s_addr = node.ip;
>>> >
>>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>>> > if (errno != EINPROGRESS) {
>>> > @@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node)
>>> > list<UPID> removed;
>>> > // Look up all linked processes.
>>> > foreachpair (const UPID& linkee, set<ProcessBase*>& processes,
>>> links)
>>> > {
>>> > - if (linkee.ip == node.ip && linkee.port == node.port) {
>>> > + if (linkee.node == node) {
>>> > foreach (ProcessBase* linker, processes) {
>>> > linker->enqueue(new ExitedEvent(linkee));
>>> > }
>>> > @@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager()
>>> >
>>> > ProcessReference ProcessManager::use(const UPID& pid)
>>> > {
>>> > - if (pid.ip == __ip__ && pid.port == __port__) {
>>> > + if (pid.node == __node__) {
>>> > synchronized (processes) {
>>> > if (processes.count(pid.id) > 0) {
>>> > // Note that the ProcessReference constructor _must_ get
>>> > @@ -2567,12 +2548,12 @@ bool ProcessManager::handle(
>>> >
>>> > if (tokens.size() == 0 && delegate != "") {
>>> > request->path = "/" + delegate;
>>> > - receiver = use(UPID(delegate, __ip__, __port__));
>>> > + receiver = use(UPID(delegate, __node__));
>>> > } else if (tokens.size() > 0) {
>>> > // Decode possible percent-encoded path.
>>> > Try<string> decode = http::decode(tokens[0]);
>>> > if (!decode.isError()) {
>>> > - receiver = use(UPID(decode.get(), __ip__, __port__));
>>> > + receiver = use(UPID(decode.get(), __node__));
>>> > } else {
>>> > VLOG(1) << "Failed to decode URL path: " << decode.error();
>>> > }
>>> > @@ -2581,7 +2562,7 @@ bool ProcessManager::handle(
>>> > if (!receiver && delegate != "") {
>>> > // Try and delegate the request.
>>> > request->path = "/" + delegate + request->path;
>>> > - receiver = use(UPID(delegate, __ip__, __port__));
>>> > + receiver = use(UPID(delegate, __node__));
>>> > }
>>> >
>>> > if (receiver) {
>>> > @@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase*
>>> process)
>>> > void ProcessManager::link(ProcessBase* process, const UPID& to)
>>> > {
>>> > // Check if the pid is local.
>>> > - if (!(to.ip == __ip__ && to.port == __port__)) {
>>> > + if (to.node != __node__) {
>>> > socket_manager->link(process, to);
>>> > } else {
>>> > // Since the pid is local we want to get a reference to it's
>>> > @@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id)
>>> > refs = 0;
>>> >
>>> > pid.id = id != "" ? id : ID::generate();
>>> > - pid.ip = __ip__;
>>> > - pid.port = __port__;
>>> > + pid.node = __node__;
>>> >
>>> > // If using a manual clock, try and set current time of process
>>> > // using happens before relationship between creator and createe!
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> > b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> > index 3177a8e..227b8e7 100644
>>> > --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> > +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> > @@ -102,9 +102,9 @@ public:
>>> > private:
>>> > void ping(const UPID& from, const string& body)
>>> > {
>>> > - if (linkedPorts.find(from.port) == linkedPorts.end()) {
>>> > + if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
>>> > setLink(from);
>>> > - linkedPorts.insert(from.port);
>>> > + linkedPorts.insert(from.node.port);
>>> > }
>>> > static const string message("hi");
>>> > send(from, "pong", message.c_str(), message.size());
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp
>>> > b/3rdparty/libprocess/src/tests/http_tests.cpp
>>> > index a1c3685..a90e65f 100644
>>> > --- a/3rdparty/libprocess/src/tests/http_tests.cpp
>>> > +++ b/3rdparty/libprocess/src/tests/http_tests.cpp
>>> > @@ -120,8 +120,8 @@ TEST(HTTP, Endpoints)
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = PF_INET;
>>> > - addr.sin_port = htons(process.self().port);
>>> > - addr.sin_addr.s_addr = process.self().ip;
>>> > + addr.sin_port = htons(process.self().node.port);
>>> > + addr.sin_addr.s_addr = process.self().node.ip;
>>> >
>>> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>>> >
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp
>>> > b/3rdparty/libprocess/src/tests/metrics_tests.cpp
>>> > index 33539e4..0c80c69 100644
>>> > --- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
>>> > +++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
>>> > @@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
>>> > {
>>> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
>>> >
>>> > - UPID upid("metrics", process::ip(), process::port());
>>> > + UPID upid("metrics", process::node());
>>> >
>>> > Clock::pause();
>>> >
>>> > @@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
>>> > {
>>> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
>>> >
>>> > - UPID upid("metrics", process::ip(), process::port());
>>> > + UPID upid("metrics", process::node());
>>> >
>>> > Clock::pause();
>>> >
>>> > @@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
>>> > // Ensures that the aggregate statistics are correct in the snapshot.
>>> > TEST(Metrics, SnapshotStatistics)
>>> > {
>>> > - UPID upid("metrics", process::ip(), process::port());
>>> > + UPID upid("metrics", process::node());
>>> >
>>> > Clock::pause();
>>> >
>>> >
>>> >
>>> >
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp
>>> > ----------------------------------------------------------------------
>>> > diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp
>>> > b/3rdparty/libprocess/src/tests/process_tests.cpp
>>> > index b985fb7..902d4d3 100644
>>> > --- a/3rdparty/libprocess/src/tests/process_tests.cpp
>>> > +++ b/3rdparty/libprocess/src/tests/process_tests.cpp
>>> > @@ -1425,8 +1425,8 @@ TEST(Process, remote)
>>> > sockaddr_in addr;
>>> > memset(&addr, 0, sizeof(addr));
>>> > addr.sin_family = PF_INET;
>>> > - addr.sin_port = htons(process.self().port);
>>> > - addr.sin_addr.s_addr = process.self().ip;
>>> > + addr.sin_port = htons(process.self().node.port);
>>> > + addr.sin_addr.s_addr = process.self().node.ip;
>>> >
>>> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>>> >
>>> > @@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs)
>>> > spawn(process);
>>> >
>>> > // Construct the PID using percent-encoding.
>>> > - UPID pid("id%2842%29", process.self().ip, process.self().port);
>>> > + UPID pid("id%2842%29", process.self().node);
>>> >
>>> > // Mimic a libprocess message sent to an installed handler.
>>> > Future<Nothing> handler1;
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> Dominic Hamon | @mrdo | Twitter
>> *There are no bad ideas; only good ideas that go horribly wrong.*
>>
>
>
>
> --
> Dominic Hamon | @mrdo | Twitter
> *There are no bad ideas; only good ideas that go horribly wrong.*
>
--
Dominic Hamon | @mrdo | Twitter
*There are no bad ideas; only good ideas that go horribly wrong.*
Re: [1/2] mesos git commit: libprocess: Replaced the ip and port
pairs from UPID class and process namespace with Node class.
Posted by Dominic Hamon <dh...@twopensource.com>.
https://reviews.apache.org/r/27932/
On Wed, Nov 12, 2014 at 2:15 PM, Dominic Hamon <dh...@twopensource.com>
wrote:
> thanks for the head's up - it didn't show up in my linux clang build.
>
> testing a fix now.
>
> On Wed, Nov 12, 2014 at 2:12 PM, Cody Maloney <co...@mesosphere.io> wrote:
>
>> I'm getting a new warning with this on OS X w/ clang 3.5 which looks like
>> it probably should be addressed (Add parens to call the node function):
>>
>> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49:
>> warning:
>> address of function 'process::node' will always evaluate to 'true'
>> [-Wbool-conversion]
>> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
>> ~~~~~~~ ^~~~
>> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: note:
>> prefix with the address-of operator to silence this warning
>> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
>> ^
>> &
>>
>> On Wed, Nov 12, 2014 at 1:05 PM, <dm...@apache.org> wrote:
>>
>> > Repository: mesos
>> > Updated Branches:
>> > refs/heads/master 3f693f23a -> 76bfb4930
>> >
>> >
>> > libprocess: Replaced the ip and port pairs from UPID class and process
>> > namespace with Node class.
>> >
>> > At the moment, the Node class is used to keep a mapping from a socket to
>> > the ip & port pair in the process namespace.
>> > I want to propose to extend its use by replacing the ip & port fields
>> from
>> > the UPID class and process namespace with this type.
>> >
>> > Review: https://reviews.apache.org/r/27446
>> >
>> >
>> > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa
>> > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa
>> > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa
>> >
>> > Branch: refs/heads/master
>> > Commit: f64562fa66a272b695560971d0e548d131f42682
>> > Parents: 3f693f2
>> > Author: Evelina Dumitrescu <ev...@gmail.com>
>> > Authored: Wed Nov 12 12:56:23 2014 -0800
>> > Committer: Dominic Hamon <dh...@twitter.com>
>> > Committed: Wed Nov 12 12:56:47 2014 -0800
>> >
>> > ----------------------------------------------------------------------
>> > 3rdparty/libprocess/include/process/node.hpp | 28 ++++++-
>> > 3rdparty/libprocess/include/process/pid.hpp | 49 +++++------
>> > 3rdparty/libprocess/include/process/process.hpp | 11 +--
>> > 3rdparty/libprocess/src/http.cpp | 13 +--
>> > 3rdparty/libprocess/src/pid.cpp | 28 +++----
>> > 3rdparty/libprocess/src/process.cpp | 88
>> ++++++++------------
>> > 3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
>> > 3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
>> > 3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
>> > 3rdparty/libprocess/src/tests/process_tests.cpp | 6 +-
>> > 10 files changed, 103 insertions(+), 134 deletions(-)
>> > ----------------------------------------------------------------------
>> >
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/include/process/node.hpp
>> > b/3rdparty/libprocess/include/process/node.hpp
>> > index 7a96894..24132a5 100644
>> > --- a/3rdparty/libprocess/include/process/node.hpp
>> > +++ b/3rdparty/libprocess/include/process/node.hpp
>> > @@ -1,17 +1,22 @@
>> > #ifndef __PROCESS_NODE_HPP__
>> > #define __PROCESS_NODE_HPP__
>> >
>> > +#include <arpa/inet.h>
>> > #include <unistd.h>
>> >
>> > #include <sstream>
>> >
>> > +#include <glog/logging.h>
>> > +
>> > namespace process {
>> >
>> > // Represents a remote "node" (encapsulates IP address and port).
>> > class Node
>> > {
>> > public:
>> > - Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {}
>> > + Node() : ip(0), port(0) {}
>> > +
>> > + Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
>> >
>> > bool operator < (const Node& that) const
>> > {
>> > @@ -22,16 +27,31 @@ public:
>> > }
>> > }
>> >
>> > - std::ostream& operator << (std::ostream& stream) const
>> > + bool operator == (const Node& that) const
>> > + {
>> > + return (ip == that.ip && port == that.port);
>> > + }
>> > +
>> > + bool operator != (const Node& that) const
>> > {
>> > - stream << ip << ":" << port;
>> > - return stream;
>> > + return !(*this == that);
>> > }
>> >
>> > uint32_t ip;
>> > uint16_t port;
>> > };
>> >
>> > +inline std::ostream& operator << (std::ostream & stream, const Node &
>> > node)
>> > +{
>> > + char ip[INET_ADDRSTRLEN];
>> > + if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) ==
>> > NULL) {
>> > + PLOG(FATAL) << "Failed to get human-readable IP address for '"
>> > + << node.ip << "'";
>> > + }
>> > + stream << ip << ":" << node.port;
>> > + return stream;
>> > +}
>> > +
>> > } // namespace process {
>> >
>> > #endif // __PROCESS_NODE_HPP__
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/include/process/pid.hpp
>> > b/3rdparty/libprocess/include/process/pid.hpp
>> > index 2345322..7dccf29 100644
>> > --- a/3rdparty/libprocess/include/process/pid.hpp
>> > +++ b/3rdparty/libprocess/include/process/pid.hpp
>> > @@ -7,6 +7,7 @@
>> > #include <sstream>
>> > #include <string>
>> >
>> > +#include <process/node.hpp>
>> >
>> > namespace process {
>> >
>> > @@ -16,17 +17,22 @@ class ProcessBase;
>> >
>> > struct UPID
>> > {
>> > - UPID()
>> > - : ip(0), port(0) {}
>> > + UPID() = default;
>> >
>> > UPID(const UPID& that)
>> > - : id(that.id), ip(that.ip), port(that.port) {}
>> > + : id(that.id), node(that.node) {}
>> >
>> > UPID(const char* id_, uint32_t ip_, uint16_t port_)
>> > - : id(id_), ip(ip_), port(port_) {}
>> > + : id(id_), node(ip_, port_) {}
>> > +
>> > + UPID(const char* id_, const Node& node_)
>> > + : id(id_), node(node_) {}
>> >
>> > UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
>> > - : id(id_), ip(ip_), port(port_) {}
>> > + : id(id_), node(ip_, port_) {}
>> > +
>> > + UPID(const std::string& id_, const Node& node_)
>> > + : id(id_), node(node_) {}
>> >
>> > /*implicit*/ UPID(const char* s);
>> >
>> > @@ -38,47 +44,33 @@ struct UPID
>> >
>> > operator bool () const
>> > {
>> > - return id != "" && ip != 0 && port != 0;
>> > + return id != "" && node.ip != 0 && node.port != 0;
>> > }
>> >
>> > bool operator ! () const // NOLINT(whitespace/operators)
>> > {
>> > - return id == "" && ip == 0 && port == 0;
>> > + return id == "" && node.ip == 0 && node.port == 0;
>> > }
>> >
>> > bool operator < (const UPID& that) const
>> > {
>> > - if (this != &that) {
>> > - if (ip == that.ip && port == that.port)
>> > - return id < that.id;
>> > - else if (ip == that.ip && port != that.port)
>> > - return port < that.port;
>> > - else
>> > - return ip < that.ip;
>> > - }
>> > -
>> > - return false;
>> > + if (node == that.node)
>> > + return id < that.id;
>> > + else return node < that.node;
>> > }
>> >
>> > bool operator == (const UPID& that) const
>> > {
>> > - if (this != &that) {
>> > - return (id == that.id &&
>> > - ip == that.ip &&
>> > - port == that.port);
>> > - }
>> > -
>> > - return true;
>> > + return (id == that.id && node == that.node);
>> > }
>> >
>> > bool operator != (const UPID& that) const
>> > {
>> > - return !(this->operator == (that));
>> > + return !(*this == that);
>> > }
>> >
>> > std::string id;
>> > - uint32_t ip;
>> > - uint16_t port;
>> > + Node node;
>> > };
>> >
>> >
>> > @@ -99,8 +91,7 @@ struct PID : UPID
>> > (void)base; // Eliminate unused base warning.
>> > PID<Base> pid;
>> > pid.id = id;
>> > - pid.ip = ip;
>> > - pid.port = port;
>> > + pid.node = node;
>> > return pid;
>> > }
>> > };
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/include/process/process.hpp
>> > b/3rdparty/libprocess/include/process/process.hpp
>> > index 81a1f7a..cb3e0a6 100644
>> > --- a/3rdparty/libprocess/include/process/process.hpp
>> > +++ b/3rdparty/libprocess/include/process/process.hpp
>> > @@ -276,16 +276,9 @@ void finalize();
>> >
>> >
>> > /**
>> > - * Returns the IP address associated with this instance of the
>> > - * library.
>> > + * Returns the node associated with this instance of the library.
>> > */
>> > -uint32_t ip();
>> > -
>> > -
>> > -/**
>> > - * Returns the port associated with this instance of the library.
>> > - */
>> > -uint16_t port();
>> > +Node node();
>> >
>> >
>> > /**
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/http.cpp
>> > b/3rdparty/libprocess/src/http.cpp
>> > index 4ef00d1..b00f333 100644
>> > --- a/3rdparty/libprocess/src/http.cpp
>> > +++ b/3rdparty/libprocess/src/http.cpp
>> > @@ -79,20 +79,13 @@ Future<Response> request(
>> > return Failure("Failed to cloexec: " + cloexec.error());
>> > }
>> >
>> > - // We use inet_ntop since inet_ntoa is not thread-safe!
>> > - char ip[INET_ADDRSTRLEN];
>> > - if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) ==
>> > NULL) {
>> > - return Failure(ErrnoError("Failed to get human-readable IP address
>> > for '" +
>> > - stringify(upid.ip) + "'"));
>> > - }
>> > -
>> > - const string host = string(ip) + ":" + stringify(upid.port);
>> > + const string host = stringify(upid.node);
>> >
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = AF_INET;
>> > - addr.sin_port = htons(upid.port);
>> > - addr.sin_addr.s_addr = upid.ip;
>> > + addr.sin_port = htons(upid.node.port);
>> > + addr.sin_addr.s_addr = upid.node.ip;
>> >
>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>> > os::close(s);
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/pid.cpp
>> > b/3rdparty/libprocess/src/pid.cpp
>> > index 20ff25c..a2c620e 100644
>> > --- a/3rdparty/libprocess/src/pid.cpp
>> > +++ b/3rdparty/libprocess/src/pid.cpp
>> > @@ -47,8 +47,7 @@ UPID::UPID(const std::string& s)
>> > UPID::UPID(const ProcessBase& process)
>> > {
>> > id = process.self().id;
>> > - ip = process.self().ip;
>> > - port = process.self().port;
>> > + node = process.self().node;
>> > }
>> >
>> >
>> > @@ -62,12 +61,7 @@ UPID::operator std::string() const
>> >
>> > ostream& operator << (ostream& stream, const UPID& pid)
>> > {
>> > - // Call inet_ntop since inet_ntoa is not thread-safe!
>> > - char ip[INET_ADDRSTRLEN];
>> > - if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) ==
>> > NULL)
>> > - memset(ip, 0, INET_ADDRSTRLEN);
>> > -
>> > - stream << pid.id << "@" << ip << ":" << pid.port;
>> > + stream << pid.id << "@" << pid.node;
>> > return stream;
>> > }
>> >
>> > @@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID&
>> pid)
>> > istream& operator >> (istream& stream, UPID& pid)
>> > {
>> > pid.id = "";
>> > - pid.ip = 0;
>> > - pid.port = 0;
>> > + pid.node.ip = 0;
>> > + pid.node.port = 0;
>> >
>> > string str;
>> > if (!(stream >> str)) {
>> > @@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid)
>> >
>> > string id;
>> > string host;
>> > - uint32_t ip;
>> > - uint16_t port;
>> > + Node node;
>> >
>> > size_t index = str.find('@');
>> >
>> > @@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid)
>> > return stream;
>> > }
>> >
>> > - ip = *((uint32_t*) hep->h_addr_list[0]);
>> > + node.ip = *((uint32_t*) hep->h_addr_list[0]);
>> >
>> > delete[] temp;
>> >
>> > str = str.substr(index + 1);
>> >
>> > - if (sscanf(str.c_str(), "%hu", &port) != 1) {
>> > + if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
>> > stream.setstate(std::ios_base::badbit);
>> > return stream;
>> > }
>> >
>> > pid.id = id;
>> > - pid.ip = ip;
>> > - pid.port = port;
>> > + pid.node = node;
>> >
>> > return stream;
>> > }
>> > @@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid)
>> > {
>> > size_t seed = 0;
>> > boost::hash_combine(seed, pid.id);
>> > - boost::hash_combine(seed, pid.ip);
>> > - boost::hash_combine(seed, pid.port);
>> > + boost::hash_combine(seed, pid.node.ip);
>> > + boost::hash_combine(seed, pid.node.port);
>> > return seed;
>> > }
>> >
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/process.cpp
>> > b/3rdparty/libprocess/src/process.cpp
>> > index 85fb995..a34b870 100644
>> > --- a/3rdparty/libprocess/src/process.cpp
>> > +++ b/3rdparty/libprocess/src/process.cpp
>> > @@ -436,11 +436,8 @@ static uint32_t __id__ = 0;
>> > // Local server socket.
>> > static int __s__ = -1;
>> >
>> > -// Local IP address.
>> > -static uint32_t __ip__ = 0;
>> > -
>> > -// Local port.
>> > -static uint16_t __port__ = 0;
>> > +// Local node.
>> > +static Node __node__;
>> >
>> > // Active SocketManager (eventually will probably be thread-local).
>> > static SocketManager* socket_manager = NULL;
>> > @@ -709,7 +706,7 @@ static Message* encode(const UPID& from,
>> >
>> > static void transport(Message* message, ProcessBase* sender = NULL)
>> > {
>> > - if (message->to.ip == __ip__ && message->to.port == __port__) {
>> > + if (message->to.node == __node__) {
>> > // Local message.
>> > process_manager->deliver(message->to, new MessageEvent(message),
>> > sender);
>> > } else {
>> > @@ -766,7 +763,7 @@ static Message* parse(Request* request)
>> > return NULL;
>> > }
>> >
>> > - const UPID to(decode.get(), __ip__, __port__);
>> > + const UPID to(decode.get(), __node__);
>> >
>> > // And now determine 'name'.
>> > index = index != string::npos ? index + 2: request->path.size();
>> > @@ -1472,15 +1469,15 @@ void initialize(const string& delegate)
>> > }
>> > }
>> >
>> > - __ip__ = 0;
>> > - __port__ = 0;
>> > + __node__.ip = 0;
>> > + __node__.port = 0;
>> >
>> > char* value;
>> >
>> > // Check environment for ip.
>> > value = getenv("LIBPROCESS_IP");
>> > if (value != NULL) {
>> > - int result = inet_pton(AF_INET, value, &__ip__);
>> > + int result = inet_pton(AF_INET, value, &__node__.ip);
>> > if (result == 0) {
>> > LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
>> > } else if (result < 0) {
>> > @@ -1495,7 +1492,7 @@ void initialize(const string& delegate)
>> > if (result < 0 || result > USHRT_MAX) {
>> > LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid
>> port";
>> > }
>> > - __port__ = result;
>> > + __node__.port = result;
>> > }
>> >
>> > // Create a "server" socket for communicating with other nodes.
>> > @@ -1525,12 +1522,11 @@ void initialize(const string& delegate)
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = PF_INET;
>> > - addr.sin_addr.s_addr = __ip__;
>> > - addr.sin_port = htons(__port__);
>> > + addr.sin_addr.s_addr = __node__.ip;
>> > + addr.sin_port = htons(__node__.port);
>> >
>> > if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
>> > - PLOG(FATAL) << "Failed to initialize, bind "
>> > - << inet_ntoa(addr.sin_addr) << ":" << __port__;
>> > + PLOG(FATAL) << "Failed to initialize, bind " << __node__;
>> > }
>> >
>> > // Lookup and store assigned ip and assigned port.
>> > @@ -1539,14 +1535,14 @@ void initialize(const string& delegate)
>> > PLOG(FATAL) << "Failed to initialize, getsockname";
>> > }
>> >
>> > - __ip__ = addr.sin_addr.s_addr;
>> > - __port__ = ntohs(addr.sin_port);
>> > + __node__.ip = addr.sin_addr.s_addr;
>> > + __node__.port = ntohs(addr.sin_port);
>> >
>> > // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
>> > // actually have a valid external ip address. Note that we need only
>> > // one ip address, so that other processes can send and receive and
>> > // don't get confused as to whom they are sending to.
>> > - if (__ip__ == 0 || __ip__ == 2130706433) {
>> > + if (__node__.ip == 0 || __node__.ip == 2130706433) {
>> > char hostname[512];
>> >
>> > if (gethostname(hostname, sizeof(hostname)) < 0) {
>> > @@ -1562,7 +1558,7 @@ void initialize(const string& delegate)
>> > << hstrerror(h_errno);
>> > }
>> >
>> > - __ip__ = *((uint32_t *) he->h_addr_list[0]);
>> > + __node__.ip = *((uint32_t *) he->h_addr_list[0]);
>> > }
>> >
>> > if (listen(__s__, 500000) < 0) {
>> > @@ -1663,13 +1659,8 @@ void initialize(const string& delegate)
>> >
>> > new Route("/__processes__", None(), __processes__);
>> >
>> > - char temp[INET_ADDRSTRLEN];
>> > - if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) ==
>> > NULL) {
>> > - PLOG(FATAL) << "Failed to initialize, inet_ntop";
>> > - }
>> > -
>> > - VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
>> > - << " for " << cpus << " cpus";
>> > + VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
>> > + << " cpus";
>> > }
>> >
>> >
>> > @@ -1679,17 +1670,10 @@ void finalize()
>> > }
>> >
>> >
>> > -uint32_t ip()
>> > -{
>> > - process::initialize();
>> > - return __ip__;
>> > -}
>> > -
>> > -
>> > -uint16_t port()
>> > +Node node()
>> > {
>> > process::initialize();
>> > - return __port__;
>> > + return __node__;
>> > }
>> >
>> >
>> > @@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process,
>> > const UPID& to)
>> >
>> > CHECK(process != NULL);
>> >
>> > - Node node(to.ip, to.port);
>> > -
>> > synchronized (this) {
>> > // Check if node is remote and there isn't a persistant link.
>> > - if ((node.ip != __ip__ || node.port != __port__)
>> > - && persists.count(node) == 0) {
>> > + if (to.node != __node__ && persists.count(to.node) == 0) {
>> > // Okay, no link, let's create a socket.
>> > Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
>> > if (socket.isError()) {
>> > @@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process,
>> const
>> > UPID& to)
>> > }
>> >
>> > sockets[s] = Socket(s);
>> > - nodes[s] = node;
>> > + nodes[s] = to.node;
>> >
>> > - persists[node] = s;
>> > + persists[to.node] = s;
>> >
>> > // Allocate and initialize a watcher for reading data from this
>> > // socket. Note that we don't expect to receive anything other
>> > @@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process,
>> const
>> > UPID& to)
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = PF_INET;
>> > - addr.sin_port = htons(to.port);
>> > - addr.sin_addr.s_addr = to.ip;
>> > + addr.sin_port = htons(to.node.port);
>> > + addr.sin_addr.s_addr = to.node.ip;
>> >
>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>> > if (errno != EINPROGRESS) {
>> > @@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message)
>> > {
>> > CHECK(message != NULL);
>> >
>> > - Node node(message->to.ip, message->to.port);
>> > + Node node(message->to.node);
>> >
>> > synchronized (this) {
>> > // Check if there is already a socket.
>> > @@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message)
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = PF_INET;
>> > - addr.sin_port = htons(message->to.port);
>> > - addr.sin_addr.s_addr = message->to.ip;
>> > + addr.sin_port = htons(node.port);
>> > + addr.sin_addr.s_addr = node.ip;
>> >
>> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
>> > if (errno != EINPROGRESS) {
>> > @@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node)
>> > list<UPID> removed;
>> > // Look up all linked processes.
>> > foreachpair (const UPID& linkee, set<ProcessBase*>& processes,
>> links)
>> > {
>> > - if (linkee.ip == node.ip && linkee.port == node.port) {
>> > + if (linkee.node == node) {
>> > foreach (ProcessBase* linker, processes) {
>> > linker->enqueue(new ExitedEvent(linkee));
>> > }
>> > @@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager()
>> >
>> > ProcessReference ProcessManager::use(const UPID& pid)
>> > {
>> > - if (pid.ip == __ip__ && pid.port == __port__) {
>> > + if (pid.node == __node__) {
>> > synchronized (processes) {
>> > if (processes.count(pid.id) > 0) {
>> > // Note that the ProcessReference constructor _must_ get
>> > @@ -2567,12 +2548,12 @@ bool ProcessManager::handle(
>> >
>> > if (tokens.size() == 0 && delegate != "") {
>> > request->path = "/" + delegate;
>> > - receiver = use(UPID(delegate, __ip__, __port__));
>> > + receiver = use(UPID(delegate, __node__));
>> > } else if (tokens.size() > 0) {
>> > // Decode possible percent-encoded path.
>> > Try<string> decode = http::decode(tokens[0]);
>> > if (!decode.isError()) {
>> > - receiver = use(UPID(decode.get(), __ip__, __port__));
>> > + receiver = use(UPID(decode.get(), __node__));
>> > } else {
>> > VLOG(1) << "Failed to decode URL path: " << decode.error();
>> > }
>> > @@ -2581,7 +2562,7 @@ bool ProcessManager::handle(
>> > if (!receiver && delegate != "") {
>> > // Try and delegate the request.
>> > request->path = "/" + delegate + request->path;
>> > - receiver = use(UPID(delegate, __ip__, __port__));
>> > + receiver = use(UPID(delegate, __node__));
>> > }
>> >
>> > if (receiver) {
>> > @@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase* process)
>> > void ProcessManager::link(ProcessBase* process, const UPID& to)
>> > {
>> > // Check if the pid is local.
>> > - if (!(to.ip == __ip__ && to.port == __port__)) {
>> > + if (to.node != __node__) {
>> > socket_manager->link(process, to);
>> > } else {
>> > // Since the pid is local we want to get a reference to it's
>> > @@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id)
>> > refs = 0;
>> >
>> > pid.id = id != "" ? id : ID::generate();
>> > - pid.ip = __ip__;
>> > - pid.port = __port__;
>> > + pid.node = __node__;
>> >
>> > // If using a manual clock, try and set current time of process
>> > // using happens before relationship between creator and createe!
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp
>> > b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> > index 3177a8e..227b8e7 100644
>> > --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>> > +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> > @@ -102,9 +102,9 @@ public:
>> > private:
>> > void ping(const UPID& from, const string& body)
>> > {
>> > - if (linkedPorts.find(from.port) == linkedPorts.end()) {
>> > + if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
>> > setLink(from);
>> > - linkedPorts.insert(from.port);
>> > + linkedPorts.insert(from.node.port);
>> > }
>> > static const string message("hi");
>> > send(from, "pong", message.c_str(), message.size());
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp
>> > b/3rdparty/libprocess/src/tests/http_tests.cpp
>> > index a1c3685..a90e65f 100644
>> > --- a/3rdparty/libprocess/src/tests/http_tests.cpp
>> > +++ b/3rdparty/libprocess/src/tests/http_tests.cpp
>> > @@ -120,8 +120,8 @@ TEST(HTTP, Endpoints)
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = PF_INET;
>> > - addr.sin_port = htons(process.self().port);
>> > - addr.sin_addr.s_addr = process.self().ip;
>> > + addr.sin_port = htons(process.self().node.port);
>> > + addr.sin_addr.s_addr = process.self().node.ip;
>> >
>> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>> >
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp
>> > b/3rdparty/libprocess/src/tests/metrics_tests.cpp
>> > index 33539e4..0c80c69 100644
>> > --- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
>> > +++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
>> > @@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
>> > {
>> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
>> >
>> > - UPID upid("metrics", process::ip(), process::port());
>> > + UPID upid("metrics", process::node());
>> >
>> > Clock::pause();
>> >
>> > @@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
>> > {
>> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
>> >
>> > - UPID upid("metrics", process::ip(), process::port());
>> > + UPID upid("metrics", process::node());
>> >
>> > Clock::pause();
>> >
>> > @@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
>> > // Ensures that the aggregate statistics are correct in the snapshot.
>> > TEST(Metrics, SnapshotStatistics)
>> > {
>> > - UPID upid("metrics", process::ip(), process::port());
>> > + UPID upid("metrics", process::node());
>> >
>> > Clock::pause();
>> >
>> >
>> >
>> >
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp
>> > ----------------------------------------------------------------------
>> > diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp
>> > b/3rdparty/libprocess/src/tests/process_tests.cpp
>> > index b985fb7..902d4d3 100644
>> > --- a/3rdparty/libprocess/src/tests/process_tests.cpp
>> > +++ b/3rdparty/libprocess/src/tests/process_tests.cpp
>> > @@ -1425,8 +1425,8 @@ TEST(Process, remote)
>> > sockaddr_in addr;
>> > memset(&addr, 0, sizeof(addr));
>> > addr.sin_family = PF_INET;
>> > - addr.sin_port = htons(process.self().port);
>> > - addr.sin_addr.s_addr = process.self().ip;
>> > + addr.sin_port = htons(process.self().node.port);
>> > + addr.sin_addr.s_addr = process.self().node.ip;
>> >
>> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>> >
>> > @@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs)
>> > spawn(process);
>> >
>> > // Construct the PID using percent-encoding.
>> > - UPID pid("id%2842%29", process.self().ip, process.self().port);
>> > + UPID pid("id%2842%29", process.self().node);
>> >
>> > // Mimic a libprocess message sent to an installed handler.
>> > Future<Nothing> handler1;
>> >
>> >
>>
>
>
>
> --
> Dominic Hamon | @mrdo | Twitter
> *There are no bad ideas; only good ideas that go horribly wrong.*
>
--
Dominic Hamon | @mrdo | Twitter
*There are no bad ideas; only good ideas that go horribly wrong.*
Re: [1/2] mesos git commit: libprocess: Replaced the ip and port
pairs from UPID class and process namespace with Node class.
Posted by Dominic Hamon <dh...@twopensource.com>.
thanks for the head's up - it didn't show up in my linux clang build.
testing a fix now.
On Wed, Nov 12, 2014 at 2:12 PM, Cody Maloney <co...@mesosphere.io> wrote:
> I'm getting a new warning with this on OS X w/ clang 3.5 which looks like
> it probably should be addressed (Add parens to call the node function):
>
> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: warning:
> address of function 'process::node' will always evaluate to 'true'
> [-Wbool-conversion]
> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
> ~~~~~~~ ^~~~
> ../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: note:
> prefix with the address-of operator to silence this warning
> VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
> ^
> &
>
> On Wed, Nov 12, 2014 at 1:05 PM, <dm...@apache.org> wrote:
>
> > Repository: mesos
> > Updated Branches:
> > refs/heads/master 3f693f23a -> 76bfb4930
> >
> >
> > libprocess: Replaced the ip and port pairs from UPID class and process
> > namespace with Node class.
> >
> > At the moment, the Node class is used to keep a mapping from a socket to
> > the ip & port pair in the process namespace.
> > I want to propose to extend its use by replacing the ip & port fields
> from
> > the UPID class and process namespace with this type.
> >
> > Review: https://reviews.apache.org/r/27446
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa
> > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa
> > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa
> >
> > Branch: refs/heads/master
> > Commit: f64562fa66a272b695560971d0e548d131f42682
> > Parents: 3f693f2
> > Author: Evelina Dumitrescu <ev...@gmail.com>
> > Authored: Wed Nov 12 12:56:23 2014 -0800
> > Committer: Dominic Hamon <dh...@twitter.com>
> > Committed: Wed Nov 12 12:56:47 2014 -0800
> >
> > ----------------------------------------------------------------------
> > 3rdparty/libprocess/include/process/node.hpp | 28 ++++++-
> > 3rdparty/libprocess/include/process/pid.hpp | 49 +++++------
> > 3rdparty/libprocess/include/process/process.hpp | 11 +--
> > 3rdparty/libprocess/src/http.cpp | 13 +--
> > 3rdparty/libprocess/src/pid.cpp | 28 +++----
> > 3rdparty/libprocess/src/process.cpp | 88
> ++++++++------------
> > 3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
> > 3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
> > 3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
> > 3rdparty/libprocess/src/tests/process_tests.cpp | 6 +-
> > 10 files changed, 103 insertions(+), 134 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/include/process/node.hpp
> > b/3rdparty/libprocess/include/process/node.hpp
> > index 7a96894..24132a5 100644
> > --- a/3rdparty/libprocess/include/process/node.hpp
> > +++ b/3rdparty/libprocess/include/process/node.hpp
> > @@ -1,17 +1,22 @@
> > #ifndef __PROCESS_NODE_HPP__
> > #define __PROCESS_NODE_HPP__
> >
> > +#include <arpa/inet.h>
> > #include <unistd.h>
> >
> > #include <sstream>
> >
> > +#include <glog/logging.h>
> > +
> > namespace process {
> >
> > // Represents a remote "node" (encapsulates IP address and port).
> > class Node
> > {
> > public:
> > - Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {}
> > + Node() : ip(0), port(0) {}
> > +
> > + Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
> >
> > bool operator < (const Node& that) const
> > {
> > @@ -22,16 +27,31 @@ public:
> > }
> > }
> >
> > - std::ostream& operator << (std::ostream& stream) const
> > + bool operator == (const Node& that) const
> > + {
> > + return (ip == that.ip && port == that.port);
> > + }
> > +
> > + bool operator != (const Node& that) const
> > {
> > - stream << ip << ":" << port;
> > - return stream;
> > + return !(*this == that);
> > }
> >
> > uint32_t ip;
> > uint16_t port;
> > };
> >
> > +inline std::ostream& operator << (std::ostream & stream, const Node &
> > node)
> > +{
> > + char ip[INET_ADDRSTRLEN];
> > + if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) ==
> > NULL) {
> > + PLOG(FATAL) << "Failed to get human-readable IP address for '"
> > + << node.ip << "'";
> > + }
> > + stream << ip << ":" << node.port;
> > + return stream;
> > +}
> > +
> > } // namespace process {
> >
> > #endif // __PROCESS_NODE_HPP__
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/include/process/pid.hpp
> > b/3rdparty/libprocess/include/process/pid.hpp
> > index 2345322..7dccf29 100644
> > --- a/3rdparty/libprocess/include/process/pid.hpp
> > +++ b/3rdparty/libprocess/include/process/pid.hpp
> > @@ -7,6 +7,7 @@
> > #include <sstream>
> > #include <string>
> >
> > +#include <process/node.hpp>
> >
> > namespace process {
> >
> > @@ -16,17 +17,22 @@ class ProcessBase;
> >
> > struct UPID
> > {
> > - UPID()
> > - : ip(0), port(0) {}
> > + UPID() = default;
> >
> > UPID(const UPID& that)
> > - : id(that.id), ip(that.ip), port(that.port) {}
> > + : id(that.id), node(that.node) {}
> >
> > UPID(const char* id_, uint32_t ip_, uint16_t port_)
> > - : id(id_), ip(ip_), port(port_) {}
> > + : id(id_), node(ip_, port_) {}
> > +
> > + UPID(const char* id_, const Node& node_)
> > + : id(id_), node(node_) {}
> >
> > UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
> > - : id(id_), ip(ip_), port(port_) {}
> > + : id(id_), node(ip_, port_) {}
> > +
> > + UPID(const std::string& id_, const Node& node_)
> > + : id(id_), node(node_) {}
> >
> > /*implicit*/ UPID(const char* s);
> >
> > @@ -38,47 +44,33 @@ struct UPID
> >
> > operator bool () const
> > {
> > - return id != "" && ip != 0 && port != 0;
> > + return id != "" && node.ip != 0 && node.port != 0;
> > }
> >
> > bool operator ! () const // NOLINT(whitespace/operators)
> > {
> > - return id == "" && ip == 0 && port == 0;
> > + return id == "" && node.ip == 0 && node.port == 0;
> > }
> >
> > bool operator < (const UPID& that) const
> > {
> > - if (this != &that) {
> > - if (ip == that.ip && port == that.port)
> > - return id < that.id;
> > - else if (ip == that.ip && port != that.port)
> > - return port < that.port;
> > - else
> > - return ip < that.ip;
> > - }
> > -
> > - return false;
> > + if (node == that.node)
> > + return id < that.id;
> > + else return node < that.node;
> > }
> >
> > bool operator == (const UPID& that) const
> > {
> > - if (this != &that) {
> > - return (id == that.id &&
> > - ip == that.ip &&
> > - port == that.port);
> > - }
> > -
> > - return true;
> > + return (id == that.id && node == that.node);
> > }
> >
> > bool operator != (const UPID& that) const
> > {
> > - return !(this->operator == (that));
> > + return !(*this == that);
> > }
> >
> > std::string id;
> > - uint32_t ip;
> > - uint16_t port;
> > + Node node;
> > };
> >
> >
> > @@ -99,8 +91,7 @@ struct PID : UPID
> > (void)base; // Eliminate unused base warning.
> > PID<Base> pid;
> > pid.id = id;
> > - pid.ip = ip;
> > - pid.port = port;
> > + pid.node = node;
> > return pid;
> > }
> > };
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/include/process/process.hpp
> > b/3rdparty/libprocess/include/process/process.hpp
> > index 81a1f7a..cb3e0a6 100644
> > --- a/3rdparty/libprocess/include/process/process.hpp
> > +++ b/3rdparty/libprocess/include/process/process.hpp
> > @@ -276,16 +276,9 @@ void finalize();
> >
> >
> > /**
> > - * Returns the IP address associated with this instance of the
> > - * library.
> > + * Returns the node associated with this instance of the library.
> > */
> > -uint32_t ip();
> > -
> > -
> > -/**
> > - * Returns the port associated with this instance of the library.
> > - */
> > -uint16_t port();
> > +Node node();
> >
> >
> > /**
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/http.cpp
> > b/3rdparty/libprocess/src/http.cpp
> > index 4ef00d1..b00f333 100644
> > --- a/3rdparty/libprocess/src/http.cpp
> > +++ b/3rdparty/libprocess/src/http.cpp
> > @@ -79,20 +79,13 @@ Future<Response> request(
> > return Failure("Failed to cloexec: " + cloexec.error());
> > }
> >
> > - // We use inet_ntop since inet_ntoa is not thread-safe!
> > - char ip[INET_ADDRSTRLEN];
> > - if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) ==
> > NULL) {
> > - return Failure(ErrnoError("Failed to get human-readable IP address
> > for '" +
> > - stringify(upid.ip) + "'"));
> > - }
> > -
> > - const string host = string(ip) + ":" + stringify(upid.port);
> > + const string host = stringify(upid.node);
> >
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = AF_INET;
> > - addr.sin_port = htons(upid.port);
> > - addr.sin_addr.s_addr = upid.ip;
> > + addr.sin_port = htons(upid.node.port);
> > + addr.sin_addr.s_addr = upid.node.ip;
> >
> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> > os::close(s);
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/pid.cpp
> > b/3rdparty/libprocess/src/pid.cpp
> > index 20ff25c..a2c620e 100644
> > --- a/3rdparty/libprocess/src/pid.cpp
> > +++ b/3rdparty/libprocess/src/pid.cpp
> > @@ -47,8 +47,7 @@ UPID::UPID(const std::string& s)
> > UPID::UPID(const ProcessBase& process)
> > {
> > id = process.self().id;
> > - ip = process.self().ip;
> > - port = process.self().port;
> > + node = process.self().node;
> > }
> >
> >
> > @@ -62,12 +61,7 @@ UPID::operator std::string() const
> >
> > ostream& operator << (ostream& stream, const UPID& pid)
> > {
> > - // Call inet_ntop since inet_ntoa is not thread-safe!
> > - char ip[INET_ADDRSTRLEN];
> > - if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) ==
> > NULL)
> > - memset(ip, 0, INET_ADDRSTRLEN);
> > -
> > - stream << pid.id << "@" << ip << ":" << pid.port;
> > + stream << pid.id << "@" << pid.node;
> > return stream;
> > }
> >
> > @@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID& pid)
> > istream& operator >> (istream& stream, UPID& pid)
> > {
> > pid.id = "";
> > - pid.ip = 0;
> > - pid.port = 0;
> > + pid.node.ip = 0;
> > + pid.node.port = 0;
> >
> > string str;
> > if (!(stream >> str)) {
> > @@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid)
> >
> > string id;
> > string host;
> > - uint32_t ip;
> > - uint16_t port;
> > + Node node;
> >
> > size_t index = str.find('@');
> >
> > @@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid)
> > return stream;
> > }
> >
> > - ip = *((uint32_t*) hep->h_addr_list[0]);
> > + node.ip = *((uint32_t*) hep->h_addr_list[0]);
> >
> > delete[] temp;
> >
> > str = str.substr(index + 1);
> >
> > - if (sscanf(str.c_str(), "%hu", &port) != 1) {
> > + if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
> > stream.setstate(std::ios_base::badbit);
> > return stream;
> > }
> >
> > pid.id = id;
> > - pid.ip = ip;
> > - pid.port = port;
> > + pid.node = node;
> >
> > return stream;
> > }
> > @@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid)
> > {
> > size_t seed = 0;
> > boost::hash_combine(seed, pid.id);
> > - boost::hash_combine(seed, pid.ip);
> > - boost::hash_combine(seed, pid.port);
> > + boost::hash_combine(seed, pid.node.ip);
> > + boost::hash_combine(seed, pid.node.port);
> > return seed;
> > }
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/process.cpp
> > b/3rdparty/libprocess/src/process.cpp
> > index 85fb995..a34b870 100644
> > --- a/3rdparty/libprocess/src/process.cpp
> > +++ b/3rdparty/libprocess/src/process.cpp
> > @@ -436,11 +436,8 @@ static uint32_t __id__ = 0;
> > // Local server socket.
> > static int __s__ = -1;
> >
> > -// Local IP address.
> > -static uint32_t __ip__ = 0;
> > -
> > -// Local port.
> > -static uint16_t __port__ = 0;
> > +// Local node.
> > +static Node __node__;
> >
> > // Active SocketManager (eventually will probably be thread-local).
> > static SocketManager* socket_manager = NULL;
> > @@ -709,7 +706,7 @@ static Message* encode(const UPID& from,
> >
> > static void transport(Message* message, ProcessBase* sender = NULL)
> > {
> > - if (message->to.ip == __ip__ && message->to.port == __port__) {
> > + if (message->to.node == __node__) {
> > // Local message.
> > process_manager->deliver(message->to, new MessageEvent(message),
> > sender);
> > } else {
> > @@ -766,7 +763,7 @@ static Message* parse(Request* request)
> > return NULL;
> > }
> >
> > - const UPID to(decode.get(), __ip__, __port__);
> > + const UPID to(decode.get(), __node__);
> >
> > // And now determine 'name'.
> > index = index != string::npos ? index + 2: request->path.size();
> > @@ -1472,15 +1469,15 @@ void initialize(const string& delegate)
> > }
> > }
> >
> > - __ip__ = 0;
> > - __port__ = 0;
> > + __node__.ip = 0;
> > + __node__.port = 0;
> >
> > char* value;
> >
> > // Check environment for ip.
> > value = getenv("LIBPROCESS_IP");
> > if (value != NULL) {
> > - int result = inet_pton(AF_INET, value, &__ip__);
> > + int result = inet_pton(AF_INET, value, &__node__.ip);
> > if (result == 0) {
> > LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
> > } else if (result < 0) {
> > @@ -1495,7 +1492,7 @@ void initialize(const string& delegate)
> > if (result < 0 || result > USHRT_MAX) {
> > LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid
> port";
> > }
> > - __port__ = result;
> > + __node__.port = result;
> > }
> >
> > // Create a "server" socket for communicating with other nodes.
> > @@ -1525,12 +1522,11 @@ void initialize(const string& delegate)
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = PF_INET;
> > - addr.sin_addr.s_addr = __ip__;
> > - addr.sin_port = htons(__port__);
> > + addr.sin_addr.s_addr = __node__.ip;
> > + addr.sin_port = htons(__node__.port);
> >
> > if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
> > - PLOG(FATAL) << "Failed to initialize, bind "
> > - << inet_ntoa(addr.sin_addr) << ":" << __port__;
> > + PLOG(FATAL) << "Failed to initialize, bind " << __node__;
> > }
> >
> > // Lookup and store assigned ip and assigned port.
> > @@ -1539,14 +1535,14 @@ void initialize(const string& delegate)
> > PLOG(FATAL) << "Failed to initialize, getsockname";
> > }
> >
> > - __ip__ = addr.sin_addr.s_addr;
> > - __port__ = ntohs(addr.sin_port);
> > + __node__.ip = addr.sin_addr.s_addr;
> > + __node__.port = ntohs(addr.sin_port);
> >
> > // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
> > // actually have a valid external ip address. Note that we need only
> > // one ip address, so that other processes can send and receive and
> > // don't get confused as to whom they are sending to.
> > - if (__ip__ == 0 || __ip__ == 2130706433) {
> > + if (__node__.ip == 0 || __node__.ip == 2130706433) {
> > char hostname[512];
> >
> > if (gethostname(hostname, sizeof(hostname)) < 0) {
> > @@ -1562,7 +1558,7 @@ void initialize(const string& delegate)
> > << hstrerror(h_errno);
> > }
> >
> > - __ip__ = *((uint32_t *) he->h_addr_list[0]);
> > + __node__.ip = *((uint32_t *) he->h_addr_list[0]);
> > }
> >
> > if (listen(__s__, 500000) < 0) {
> > @@ -1663,13 +1659,8 @@ void initialize(const string& delegate)
> >
> > new Route("/__processes__", None(), __processes__);
> >
> > - char temp[INET_ADDRSTRLEN];
> > - if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) ==
> > NULL) {
> > - PLOG(FATAL) << "Failed to initialize, inet_ntop";
> > - }
> > -
> > - VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
> > - << " for " << cpus << " cpus";
> > + VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
> > + << " cpus";
> > }
> >
> >
> > @@ -1679,17 +1670,10 @@ void finalize()
> > }
> >
> >
> > -uint32_t ip()
> > -{
> > - process::initialize();
> > - return __ip__;
> > -}
> > -
> > -
> > -uint16_t port()
> > +Node node()
> > {
> > process::initialize();
> > - return __port__;
> > + return __node__;
> > }
> >
> >
> > @@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process,
> > const UPID& to)
> >
> > CHECK(process != NULL);
> >
> > - Node node(to.ip, to.port);
> > -
> > synchronized (this) {
> > // Check if node is remote and there isn't a persistant link.
> > - if ((node.ip != __ip__ || node.port != __port__)
> > - && persists.count(node) == 0) {
> > + if (to.node != __node__ && persists.count(to.node) == 0) {
> > // Okay, no link, let's create a socket.
> > Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
> > if (socket.isError()) {
> > @@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process,
> const
> > UPID& to)
> > }
> >
> > sockets[s] = Socket(s);
> > - nodes[s] = node;
> > + nodes[s] = to.node;
> >
> > - persists[node] = s;
> > + persists[to.node] = s;
> >
> > // Allocate and initialize a watcher for reading data from this
> > // socket. Note that we don't expect to receive anything other
> > @@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process,
> const
> > UPID& to)
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = PF_INET;
> > - addr.sin_port = htons(to.port);
> > - addr.sin_addr.s_addr = to.ip;
> > + addr.sin_port = htons(to.node.port);
> > + addr.sin_addr.s_addr = to.node.ip;
> >
> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> > if (errno != EINPROGRESS) {
> > @@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message)
> > {
> > CHECK(message != NULL);
> >
> > - Node node(message->to.ip, message->to.port);
> > + Node node(message->to.node);
> >
> > synchronized (this) {
> > // Check if there is already a socket.
> > @@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message)
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = PF_INET;
> > - addr.sin_port = htons(message->to.port);
> > - addr.sin_addr.s_addr = message->to.ip;
> > + addr.sin_port = htons(node.port);
> > + addr.sin_addr.s_addr = node.ip;
> >
> > if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> > if (errno != EINPROGRESS) {
> > @@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node)
> > list<UPID> removed;
> > // Look up all linked processes.
> > foreachpair (const UPID& linkee, set<ProcessBase*>& processes,
> links)
> > {
> > - if (linkee.ip == node.ip && linkee.port == node.port) {
> > + if (linkee.node == node) {
> > foreach (ProcessBase* linker, processes) {
> > linker->enqueue(new ExitedEvent(linkee));
> > }
> > @@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager()
> >
> > ProcessReference ProcessManager::use(const UPID& pid)
> > {
> > - if (pid.ip == __ip__ && pid.port == __port__) {
> > + if (pid.node == __node__) {
> > synchronized (processes) {
> > if (processes.count(pid.id) > 0) {
> > // Note that the ProcessReference constructor _must_ get
> > @@ -2567,12 +2548,12 @@ bool ProcessManager::handle(
> >
> > if (tokens.size() == 0 && delegate != "") {
> > request->path = "/" + delegate;
> > - receiver = use(UPID(delegate, __ip__, __port__));
> > + receiver = use(UPID(delegate, __node__));
> > } else if (tokens.size() > 0) {
> > // Decode possible percent-encoded path.
> > Try<string> decode = http::decode(tokens[0]);
> > if (!decode.isError()) {
> > - receiver = use(UPID(decode.get(), __ip__, __port__));
> > + receiver = use(UPID(decode.get(), __node__));
> > } else {
> > VLOG(1) << "Failed to decode URL path: " << decode.error();
> > }
> > @@ -2581,7 +2562,7 @@ bool ProcessManager::handle(
> > if (!receiver && delegate != "") {
> > // Try and delegate the request.
> > request->path = "/" + delegate + request->path;
> > - receiver = use(UPID(delegate, __ip__, __port__));
> > + receiver = use(UPID(delegate, __node__));
> > }
> >
> > if (receiver) {
> > @@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase* process)
> > void ProcessManager::link(ProcessBase* process, const UPID& to)
> > {
> > // Check if the pid is local.
> > - if (!(to.ip == __ip__ && to.port == __port__)) {
> > + if (to.node != __node__) {
> > socket_manager->link(process, to);
> > } else {
> > // Since the pid is local we want to get a reference to it's
> > @@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id)
> > refs = 0;
> >
> > pid.id = id != "" ? id : ID::generate();
> > - pid.ip = __ip__;
> > - pid.port = __port__;
> > + pid.node = __node__;
> >
> > // If using a manual clock, try and set current time of process
> > // using happens before relationship between creator and createe!
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp
> > b/3rdparty/libprocess/src/tests/benchmarks.cpp
> > index 3177a8e..227b8e7 100644
> > --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> > +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> > @@ -102,9 +102,9 @@ public:
> > private:
> > void ping(const UPID& from, const string& body)
> > {
> > - if (linkedPorts.find(from.port) == linkedPorts.end()) {
> > + if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
> > setLink(from);
> > - linkedPorts.insert(from.port);
> > + linkedPorts.insert(from.node.port);
> > }
> > static const string message("hi");
> > send(from, "pong", message.c_str(), message.size());
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp
> > b/3rdparty/libprocess/src/tests/http_tests.cpp
> > index a1c3685..a90e65f 100644
> > --- a/3rdparty/libprocess/src/tests/http_tests.cpp
> > +++ b/3rdparty/libprocess/src/tests/http_tests.cpp
> > @@ -120,8 +120,8 @@ TEST(HTTP, Endpoints)
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = PF_INET;
> > - addr.sin_port = htons(process.self().port);
> > - addr.sin_addr.s_addr = process.self().ip;
> > + addr.sin_port = htons(process.self().node.port);
> > + addr.sin_addr.s_addr = process.self().node.ip;
> >
> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp
> > b/3rdparty/libprocess/src/tests/metrics_tests.cpp
> > index 33539e4..0c80c69 100644
> > --- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
> > +++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
> > @@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
> > {
> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
> >
> > - UPID upid("metrics", process::ip(), process::port());
> > + UPID upid("metrics", process::node());
> >
> > Clock::pause();
> >
> > @@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
> > {
> > ASSERT_TRUE(GTEST_IS_THREADSAFE);
> >
> > - UPID upid("metrics", process::ip(), process::port());
> > + UPID upid("metrics", process::node());
> >
> > Clock::pause();
> >
> > @@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
> > // Ensures that the aggregate statistics are correct in the snapshot.
> > TEST(Metrics, SnapshotStatistics)
> > {
> > - UPID upid("metrics", process::ip(), process::port());
> > + UPID upid("metrics", process::node());
> >
> > Clock::pause();
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp
> > ----------------------------------------------------------------------
> > diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp
> > b/3rdparty/libprocess/src/tests/process_tests.cpp
> > index b985fb7..902d4d3 100644
> > --- a/3rdparty/libprocess/src/tests/process_tests.cpp
> > +++ b/3rdparty/libprocess/src/tests/process_tests.cpp
> > @@ -1425,8 +1425,8 @@ TEST(Process, remote)
> > sockaddr_in addr;
> > memset(&addr, 0, sizeof(addr));
> > addr.sin_family = PF_INET;
> > - addr.sin_port = htons(process.self().port);
> > - addr.sin_addr.s_addr = process.self().ip;
> > + addr.sin_port = htons(process.self().node.port);
> > + addr.sin_addr.s_addr = process.self().node.ip;
> >
> > ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
> >
> > @@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs)
> > spawn(process);
> >
> > // Construct the PID using percent-encoding.
> > - UPID pid("id%2842%29", process.self().ip, process.self().port);
> > + UPID pid("id%2842%29", process.self().node);
> >
> > // Mimic a libprocess message sent to an installed handler.
> > Future<Nothing> handler1;
> >
> >
>
--
Dominic Hamon | @mrdo | Twitter
*There are no bad ideas; only good ideas that go horribly wrong.*
Re: [1/2] mesos git commit: libprocess: Replaced the ip and port
pairs from UPID class and process namespace with Node class.
Posted by Cody Maloney <co...@mesosphere.io>.
I'm getting a new warning with this on OS X w/ clang 3.5 which looks like
it probably should be addressed (Add parens to call the node function):
../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: warning:
address of function 'process::node' will always evaluate to 'true'
[-Wbool-conversion]
VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
~~~~~~~ ^~~~
../../../mesos_public/3rdparty/libprocess/src/process.cpp:1662:49: note:
prefix with the address-of operator to silence this warning
VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
^
&
On Wed, Nov 12, 2014 at 1:05 PM, <dm...@apache.org> wrote:
> Repository: mesos
> Updated Branches:
> refs/heads/master 3f693f23a -> 76bfb4930
>
>
> libprocess: Replaced the ip and port pairs from UPID class and process
> namespace with Node class.
>
> At the moment, the Node class is used to keep a mapping from a socket to
> the ip & port pair in the process namespace.
> I want to propose to extend its use by replacing the ip & port fields from
> the UPID class and process namespace with this type.
>
> Review: https://reviews.apache.org/r/27446
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa
>
> Branch: refs/heads/master
> Commit: f64562fa66a272b695560971d0e548d131f42682
> Parents: 3f693f2
> Author: Evelina Dumitrescu <ev...@gmail.com>
> Authored: Wed Nov 12 12:56:23 2014 -0800
> Committer: Dominic Hamon <dh...@twitter.com>
> Committed: Wed Nov 12 12:56:47 2014 -0800
>
> ----------------------------------------------------------------------
> 3rdparty/libprocess/include/process/node.hpp | 28 ++++++-
> 3rdparty/libprocess/include/process/pid.hpp | 49 +++++------
> 3rdparty/libprocess/include/process/process.hpp | 11 +--
> 3rdparty/libprocess/src/http.cpp | 13 +--
> 3rdparty/libprocess/src/pid.cpp | 28 +++----
> 3rdparty/libprocess/src/process.cpp | 88 ++++++++------------
> 3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
> 3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
> 3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
> 3rdparty/libprocess/src/tests/process_tests.cpp | 6 +-
> 10 files changed, 103 insertions(+), 134 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/include/process/node.hpp
> b/3rdparty/libprocess/include/process/node.hpp
> index 7a96894..24132a5 100644
> --- a/3rdparty/libprocess/include/process/node.hpp
> +++ b/3rdparty/libprocess/include/process/node.hpp
> @@ -1,17 +1,22 @@
> #ifndef __PROCESS_NODE_HPP__
> #define __PROCESS_NODE_HPP__
>
> +#include <arpa/inet.h>
> #include <unistd.h>
>
> #include <sstream>
>
> +#include <glog/logging.h>
> +
> namespace process {
>
> // Represents a remote "node" (encapsulates IP address and port).
> class Node
> {
> public:
> - Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {}
> + Node() : ip(0), port(0) {}
> +
> + Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
>
> bool operator < (const Node& that) const
> {
> @@ -22,16 +27,31 @@ public:
> }
> }
>
> - std::ostream& operator << (std::ostream& stream) const
> + bool operator == (const Node& that) const
> + {
> + return (ip == that.ip && port == that.port);
> + }
> +
> + bool operator != (const Node& that) const
> {
> - stream << ip << ":" << port;
> - return stream;
> + return !(*this == that);
> }
>
> uint32_t ip;
> uint16_t port;
> };
>
> +inline std::ostream& operator << (std::ostream & stream, const Node &
> node)
> +{
> + char ip[INET_ADDRSTRLEN];
> + if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) ==
> NULL) {
> + PLOG(FATAL) << "Failed to get human-readable IP address for '"
> + << node.ip << "'";
> + }
> + stream << ip << ":" << node.port;
> + return stream;
> +}
> +
> } // namespace process {
>
> #endif // __PROCESS_NODE_HPP__
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/include/process/pid.hpp
> b/3rdparty/libprocess/include/process/pid.hpp
> index 2345322..7dccf29 100644
> --- a/3rdparty/libprocess/include/process/pid.hpp
> +++ b/3rdparty/libprocess/include/process/pid.hpp
> @@ -7,6 +7,7 @@
> #include <sstream>
> #include <string>
>
> +#include <process/node.hpp>
>
> namespace process {
>
> @@ -16,17 +17,22 @@ class ProcessBase;
>
> struct UPID
> {
> - UPID()
> - : ip(0), port(0) {}
> + UPID() = default;
>
> UPID(const UPID& that)
> - : id(that.id), ip(that.ip), port(that.port) {}
> + : id(that.id), node(that.node) {}
>
> UPID(const char* id_, uint32_t ip_, uint16_t port_)
> - : id(id_), ip(ip_), port(port_) {}
> + : id(id_), node(ip_, port_) {}
> +
> + UPID(const char* id_, const Node& node_)
> + : id(id_), node(node_) {}
>
> UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
> - : id(id_), ip(ip_), port(port_) {}
> + : id(id_), node(ip_, port_) {}
> +
> + UPID(const std::string& id_, const Node& node_)
> + : id(id_), node(node_) {}
>
> /*implicit*/ UPID(const char* s);
>
> @@ -38,47 +44,33 @@ struct UPID
>
> operator bool () const
> {
> - return id != "" && ip != 0 && port != 0;
> + return id != "" && node.ip != 0 && node.port != 0;
> }
>
> bool operator ! () const // NOLINT(whitespace/operators)
> {
> - return id == "" && ip == 0 && port == 0;
> + return id == "" && node.ip == 0 && node.port == 0;
> }
>
> bool operator < (const UPID& that) const
> {
> - if (this != &that) {
> - if (ip == that.ip && port == that.port)
> - return id < that.id;
> - else if (ip == that.ip && port != that.port)
> - return port < that.port;
> - else
> - return ip < that.ip;
> - }
> -
> - return false;
> + if (node == that.node)
> + return id < that.id;
> + else return node < that.node;
> }
>
> bool operator == (const UPID& that) const
> {
> - if (this != &that) {
> - return (id == that.id &&
> - ip == that.ip &&
> - port == that.port);
> - }
> -
> - return true;
> + return (id == that.id && node == that.node);
> }
>
> bool operator != (const UPID& that) const
> {
> - return !(this->operator == (that));
> + return !(*this == that);
> }
>
> std::string id;
> - uint32_t ip;
> - uint16_t port;
> + Node node;
> };
>
>
> @@ -99,8 +91,7 @@ struct PID : UPID
> (void)base; // Eliminate unused base warning.
> PID<Base> pid;
> pid.id = id;
> - pid.ip = ip;
> - pid.port = port;
> + pid.node = node;
> return pid;
> }
> };
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/include/process/process.hpp
> b/3rdparty/libprocess/include/process/process.hpp
> index 81a1f7a..cb3e0a6 100644
> --- a/3rdparty/libprocess/include/process/process.hpp
> +++ b/3rdparty/libprocess/include/process/process.hpp
> @@ -276,16 +276,9 @@ void finalize();
>
>
> /**
> - * Returns the IP address associated with this instance of the
> - * library.
> + * Returns the node associated with this instance of the library.
> */
> -uint32_t ip();
> -
> -
> -/**
> - * Returns the port associated with this instance of the library.
> - */
> -uint16_t port();
> +Node node();
>
>
> /**
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/http.cpp
> b/3rdparty/libprocess/src/http.cpp
> index 4ef00d1..b00f333 100644
> --- a/3rdparty/libprocess/src/http.cpp
> +++ b/3rdparty/libprocess/src/http.cpp
> @@ -79,20 +79,13 @@ Future<Response> request(
> return Failure("Failed to cloexec: " + cloexec.error());
> }
>
> - // We use inet_ntop since inet_ntoa is not thread-safe!
> - char ip[INET_ADDRSTRLEN];
> - if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) ==
> NULL) {
> - return Failure(ErrnoError("Failed to get human-readable IP address
> for '" +
> - stringify(upid.ip) + "'"));
> - }
> -
> - const string host = string(ip) + ":" + stringify(upid.port);
> + const string host = stringify(upid.node);
>
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = AF_INET;
> - addr.sin_port = htons(upid.port);
> - addr.sin_addr.s_addr = upid.ip;
> + addr.sin_port = htons(upid.node.port);
> + addr.sin_addr.s_addr = upid.node.ip;
>
> if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> os::close(s);
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/pid.cpp
> b/3rdparty/libprocess/src/pid.cpp
> index 20ff25c..a2c620e 100644
> --- a/3rdparty/libprocess/src/pid.cpp
> +++ b/3rdparty/libprocess/src/pid.cpp
> @@ -47,8 +47,7 @@ UPID::UPID(const std::string& s)
> UPID::UPID(const ProcessBase& process)
> {
> id = process.self().id;
> - ip = process.self().ip;
> - port = process.self().port;
> + node = process.self().node;
> }
>
>
> @@ -62,12 +61,7 @@ UPID::operator std::string() const
>
> ostream& operator << (ostream& stream, const UPID& pid)
> {
> - // Call inet_ntop since inet_ntoa is not thread-safe!
> - char ip[INET_ADDRSTRLEN];
> - if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) ==
> NULL)
> - memset(ip, 0, INET_ADDRSTRLEN);
> -
> - stream << pid.id << "@" << ip << ":" << pid.port;
> + stream << pid.id << "@" << pid.node;
> return stream;
> }
>
> @@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID& pid)
> istream& operator >> (istream& stream, UPID& pid)
> {
> pid.id = "";
> - pid.ip = 0;
> - pid.port = 0;
> + pid.node.ip = 0;
> + pid.node.port = 0;
>
> string str;
> if (!(stream >> str)) {
> @@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid)
>
> string id;
> string host;
> - uint32_t ip;
> - uint16_t port;
> + Node node;
>
> size_t index = str.find('@');
>
> @@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid)
> return stream;
> }
>
> - ip = *((uint32_t*) hep->h_addr_list[0]);
> + node.ip = *((uint32_t*) hep->h_addr_list[0]);
>
> delete[] temp;
>
> str = str.substr(index + 1);
>
> - if (sscanf(str.c_str(), "%hu", &port) != 1) {
> + if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
> stream.setstate(std::ios_base::badbit);
> return stream;
> }
>
> pid.id = id;
> - pid.ip = ip;
> - pid.port = port;
> + pid.node = node;
>
> return stream;
> }
> @@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid)
> {
> size_t seed = 0;
> boost::hash_combine(seed, pid.id);
> - boost::hash_combine(seed, pid.ip);
> - boost::hash_combine(seed, pid.port);
> + boost::hash_combine(seed, pid.node.ip);
> + boost::hash_combine(seed, pid.node.port);
> return seed;
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/process.cpp
> b/3rdparty/libprocess/src/process.cpp
> index 85fb995..a34b870 100644
> --- a/3rdparty/libprocess/src/process.cpp
> +++ b/3rdparty/libprocess/src/process.cpp
> @@ -436,11 +436,8 @@ static uint32_t __id__ = 0;
> // Local server socket.
> static int __s__ = -1;
>
> -// Local IP address.
> -static uint32_t __ip__ = 0;
> -
> -// Local port.
> -static uint16_t __port__ = 0;
> +// Local node.
> +static Node __node__;
>
> // Active SocketManager (eventually will probably be thread-local).
> static SocketManager* socket_manager = NULL;
> @@ -709,7 +706,7 @@ static Message* encode(const UPID& from,
>
> static void transport(Message* message, ProcessBase* sender = NULL)
> {
> - if (message->to.ip == __ip__ && message->to.port == __port__) {
> + if (message->to.node == __node__) {
> // Local message.
> process_manager->deliver(message->to, new MessageEvent(message),
> sender);
> } else {
> @@ -766,7 +763,7 @@ static Message* parse(Request* request)
> return NULL;
> }
>
> - const UPID to(decode.get(), __ip__, __port__);
> + const UPID to(decode.get(), __node__);
>
> // And now determine 'name'.
> index = index != string::npos ? index + 2: request->path.size();
> @@ -1472,15 +1469,15 @@ void initialize(const string& delegate)
> }
> }
>
> - __ip__ = 0;
> - __port__ = 0;
> + __node__.ip = 0;
> + __node__.port = 0;
>
> char* value;
>
> // Check environment for ip.
> value = getenv("LIBPROCESS_IP");
> if (value != NULL) {
> - int result = inet_pton(AF_INET, value, &__ip__);
> + int result = inet_pton(AF_INET, value, &__node__.ip);
> if (result == 0) {
> LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
> } else if (result < 0) {
> @@ -1495,7 +1492,7 @@ void initialize(const string& delegate)
> if (result < 0 || result > USHRT_MAX) {
> LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
> }
> - __port__ = result;
> + __node__.port = result;
> }
>
> // Create a "server" socket for communicating with other nodes.
> @@ -1525,12 +1522,11 @@ void initialize(const string& delegate)
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = PF_INET;
> - addr.sin_addr.s_addr = __ip__;
> - addr.sin_port = htons(__port__);
> + addr.sin_addr.s_addr = __node__.ip;
> + addr.sin_port = htons(__node__.port);
>
> if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
> - PLOG(FATAL) << "Failed to initialize, bind "
> - << inet_ntoa(addr.sin_addr) << ":" << __port__;
> + PLOG(FATAL) << "Failed to initialize, bind " << __node__;
> }
>
> // Lookup and store assigned ip and assigned port.
> @@ -1539,14 +1535,14 @@ void initialize(const string& delegate)
> PLOG(FATAL) << "Failed to initialize, getsockname";
> }
>
> - __ip__ = addr.sin_addr.s_addr;
> - __port__ = ntohs(addr.sin_port);
> + __node__.ip = addr.sin_addr.s_addr;
> + __node__.port = ntohs(addr.sin_port);
>
> // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
> // actually have a valid external ip address. Note that we need only
> // one ip address, so that other processes can send and receive and
> // don't get confused as to whom they are sending to.
> - if (__ip__ == 0 || __ip__ == 2130706433) {
> + if (__node__.ip == 0 || __node__.ip == 2130706433) {
> char hostname[512];
>
> if (gethostname(hostname, sizeof(hostname)) < 0) {
> @@ -1562,7 +1558,7 @@ void initialize(const string& delegate)
> << hstrerror(h_errno);
> }
>
> - __ip__ = *((uint32_t *) he->h_addr_list[0]);
> + __node__.ip = *((uint32_t *) he->h_addr_list[0]);
> }
>
> if (listen(__s__, 500000) < 0) {
> @@ -1663,13 +1659,8 @@ void initialize(const string& delegate)
>
> new Route("/__processes__", None(), __processes__);
>
> - char temp[INET_ADDRSTRLEN];
> - if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) ==
> NULL) {
> - PLOG(FATAL) << "Failed to initialize, inet_ntop";
> - }
> -
> - VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
> - << " for " << cpus << " cpus";
> + VLOG(1) << "libprocess is initialized on " << node << " for " << cpus
> + << " cpus";
> }
>
>
> @@ -1679,17 +1670,10 @@ void finalize()
> }
>
>
> -uint32_t ip()
> -{
> - process::initialize();
> - return __ip__;
> -}
> -
> -
> -uint16_t port()
> +Node node()
> {
> process::initialize();
> - return __port__;
> + return __node__;
> }
>
>
> @@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process,
> const UPID& to)
>
> CHECK(process != NULL);
>
> - Node node(to.ip, to.port);
> -
> synchronized (this) {
> // Check if node is remote and there isn't a persistant link.
> - if ((node.ip != __ip__ || node.port != __port__)
> - && persists.count(node) == 0) {
> + if (to.node != __node__ && persists.count(to.node) == 0) {
> // Okay, no link, let's create a socket.
> Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
> if (socket.isError()) {
> @@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process, const
> UPID& to)
> }
>
> sockets[s] = Socket(s);
> - nodes[s] = node;
> + nodes[s] = to.node;
>
> - persists[node] = s;
> + persists[to.node] = s;
>
> // Allocate and initialize a watcher for reading data from this
> // socket. Note that we don't expect to receive anything other
> @@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process, const
> UPID& to)
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = PF_INET;
> - addr.sin_port = htons(to.port);
> - addr.sin_addr.s_addr = to.ip;
> + addr.sin_port = htons(to.node.port);
> + addr.sin_addr.s_addr = to.node.ip;
>
> if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> if (errno != EINPROGRESS) {
> @@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message)
> {
> CHECK(message != NULL);
>
> - Node node(message->to.ip, message->to.port);
> + Node node(message->to.node);
>
> synchronized (this) {
> // Check if there is already a socket.
> @@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message)
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = PF_INET;
> - addr.sin_port = htons(message->to.port);
> - addr.sin_addr.s_addr = message->to.ip;
> + addr.sin_port = htons(node.port);
> + addr.sin_addr.s_addr = node.ip;
>
> if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
> if (errno != EINPROGRESS) {
> @@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node)
> list<UPID> removed;
> // Look up all linked processes.
> foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links)
> {
> - if (linkee.ip == node.ip && linkee.port == node.port) {
> + if (linkee.node == node) {
> foreach (ProcessBase* linker, processes) {
> linker->enqueue(new ExitedEvent(linkee));
> }
> @@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager()
>
> ProcessReference ProcessManager::use(const UPID& pid)
> {
> - if (pid.ip == __ip__ && pid.port == __port__) {
> + if (pid.node == __node__) {
> synchronized (processes) {
> if (processes.count(pid.id) > 0) {
> // Note that the ProcessReference constructor _must_ get
> @@ -2567,12 +2548,12 @@ bool ProcessManager::handle(
>
> if (tokens.size() == 0 && delegate != "") {
> request->path = "/" + delegate;
> - receiver = use(UPID(delegate, __ip__, __port__));
> + receiver = use(UPID(delegate, __node__));
> } else if (tokens.size() > 0) {
> // Decode possible percent-encoded path.
> Try<string> decode = http::decode(tokens[0]);
> if (!decode.isError()) {
> - receiver = use(UPID(decode.get(), __ip__, __port__));
> + receiver = use(UPID(decode.get(), __node__));
> } else {
> VLOG(1) << "Failed to decode URL path: " << decode.error();
> }
> @@ -2581,7 +2562,7 @@ bool ProcessManager::handle(
> if (!receiver && delegate != "") {
> // Try and delegate the request.
> request->path = "/" + delegate + request->path;
> - receiver = use(UPID(delegate, __ip__, __port__));
> + receiver = use(UPID(delegate, __node__));
> }
>
> if (receiver) {
> @@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase* process)
> void ProcessManager::link(ProcessBase* process, const UPID& to)
> {
> // Check if the pid is local.
> - if (!(to.ip == __ip__ && to.port == __port__)) {
> + if (to.node != __node__) {
> socket_manager->link(process, to);
> } else {
> // Since the pid is local we want to get a reference to it's
> @@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id)
> refs = 0;
>
> pid.id = id != "" ? id : ID::generate();
> - pid.ip = __ip__;
> - pid.port = __port__;
> + pid.node = __node__;
>
> // If using a manual clock, try and set current time of process
> // using happens before relationship between creator and createe!
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp
> b/3rdparty/libprocess/src/tests/benchmarks.cpp
> index 3177a8e..227b8e7 100644
> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> @@ -102,9 +102,9 @@ public:
> private:
> void ping(const UPID& from, const string& body)
> {
> - if (linkedPorts.find(from.port) == linkedPorts.end()) {
> + if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
> setLink(from);
> - linkedPorts.insert(from.port);
> + linkedPorts.insert(from.node.port);
> }
> static const string message("hi");
> send(from, "pong", message.c_str(), message.size());
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp
> b/3rdparty/libprocess/src/tests/http_tests.cpp
> index a1c3685..a90e65f 100644
> --- a/3rdparty/libprocess/src/tests/http_tests.cpp
> +++ b/3rdparty/libprocess/src/tests/http_tests.cpp
> @@ -120,8 +120,8 @@ TEST(HTTP, Endpoints)
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = PF_INET;
> - addr.sin_port = htons(process.self().port);
> - addr.sin_addr.s_addr = process.self().ip;
> + addr.sin_port = htons(process.self().node.port);
> + addr.sin_addr.s_addr = process.self().node.ip;
>
> ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp
> b/3rdparty/libprocess/src/tests/metrics_tests.cpp
> index 33539e4..0c80c69 100644
> --- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
> +++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
> @@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
> {
> ASSERT_TRUE(GTEST_IS_THREADSAFE);
>
> - UPID upid("metrics", process::ip(), process::port());
> + UPID upid("metrics", process::node());
>
> Clock::pause();
>
> @@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
> {
> ASSERT_TRUE(GTEST_IS_THREADSAFE);
>
> - UPID upid("metrics", process::ip(), process::port());
> + UPID upid("metrics", process::node());
>
> Clock::pause();
>
> @@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
> // Ensures that the aggregate statistics are correct in the snapshot.
> TEST(Metrics, SnapshotStatistics)
> {
> - UPID upid("metrics", process::ip(), process::port());
> + UPID upid("metrics", process::node());
>
> Clock::pause();
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp
> b/3rdparty/libprocess/src/tests/process_tests.cpp
> index b985fb7..902d4d3 100644
> --- a/3rdparty/libprocess/src/tests/process_tests.cpp
> +++ b/3rdparty/libprocess/src/tests/process_tests.cpp
> @@ -1425,8 +1425,8 @@ TEST(Process, remote)
> sockaddr_in addr;
> memset(&addr, 0, sizeof(addr));
> addr.sin_family = PF_INET;
> - addr.sin_port = htons(process.self().port);
> - addr.sin_addr.s_addr = process.self().ip;
> + addr.sin_port = htons(process.self().node.port);
> + addr.sin_addr.s_addr = process.self().node.ip;
>
> ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
>
> @@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs)
> spawn(process);
>
> // Construct the PID using percent-encoding.
> - UPID pid("id%2842%29", process.self().ip, process.self().port);
> + UPID pid("id%2842%29", process.self().node);
>
> // Mimic a libprocess message sent to an installed handler.
> Future<Nothing> handler1;
>
>
[2/2] mesos git commit: Replace the ip and port pairs from UPID class
and process namespace with Node class.
Posted by dm...@apache.org.
Replace the ip and port pairs from UPID class and process namespace with Node class.
At the moment, the Node class is used to keep a mapping from a socket to the ip
& port pair in the process namespace. I want to propose to extend its use by
replacing the ip & port fields from the UPID class and process namespace with this
type.
Review: https://reviews.apache.org/r/27447
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/76bfb493
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/76bfb493
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/76bfb493
Branch: refs/heads/master
Commit: 76bfb49301ff80a4535f47bce59d547ba867319f
Parents: f64562f
Author: Evelina Dumitrescu <ev...@gmail.com>
Authored: Wed Nov 12 12:56:54 2014 -0800
Committer: Dominic Hamon <dh...@twitter.com>
Committed: Wed Nov 12 12:58:02 2014 -0800
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 6 +--
src/master/master.cpp | 10 ++--
src/sched/sched.cpp | 2 +-
src/scheduler/scheduler.cpp | 2 +-
src/slave/http.cpp | 2 +-
src/slave/slave.cpp | 6 +--
src/tests/fetcher_tests.cpp | 4 +-
src/tests/files_tests.cpp | 8 +--
src/tests/gc_tests.cpp | 6 +--
src/tests/logging_tests.cpp | 3 +-
src/tests/master_contender_detector_tests.cpp | 60 +++++++++++-----------
src/tests/master_tests.cpp | 12 ++---
src/tests/monitor_tests.cpp | 2 +-
13 files changed, 61 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 33ce782..e3e430d 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -111,11 +111,11 @@ MasterInfo createMasterInfo(const process::UPID& pid)
{
MasterInfo info;
info.set_id(stringify(pid) + "-" + UUID::random().toString());
- info.set_ip(pid.ip);
- info.set_port(pid.port);
+ info.set_ip(pid.node.ip);
+ info.set_port(pid.node.port);
info.set_pid(pid);
- Try<std::string> hostname = net::getHostname(pid.ip);
+ Try<std::string> hostname = net::getHostname(pid.node.ip);
if (hostname.isSome()) {
info.set_hostname(hostname.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 83fecba..fbf6375 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -282,20 +282,20 @@ Master::Master(
// address and port from self() and the OS PID.
Try<string> id =
strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
- self().ip, self().port, getpid());
+ self().node.ip, self().node.port, getpid());
CHECK(!id.isError()) << id.error();
info_.set_id(id.get());
- info_.set_ip(self().ip);
- info_.set_port(self().port);
+ info_.set_ip(self().node.ip);
+ info_.set_port(self().node.port);
info_.set_pid(self());
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
- Try<string> result = net::getHostname(self().ip);
+ Try<string> result = net::getHostname(self().node.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
@@ -318,7 +318,7 @@ void Master::initialize()
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
<< " started on " << string(self()).substr(7);
- if (stringify(net::IP(ntohl(self().ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Master bound to loopback interface!"
<< " Cannot communicate with remote schedulers or slaves."
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 4981dfb..d662182 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1100,7 +1100,7 @@ void MesosSchedulerDriver::initialize() {
// Initialize libprocess.
process::initialize(schedulerId);
- if (stringify(net::IP(ntohl(process::ip()))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(process::node().ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index cbb982a..ff6ff11 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -127,7 +127,7 @@ public:
// want to use flags to initialize libprocess).
process::initialize();
- if (stringify(net::IP(ntohl(self().ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 965d78c..d1cf8a6 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -361,7 +361,7 @@ Future<Response> Slave::Http::state(const Request& request)
object.values["lost_tasks"] = slave->stats.tasks[TASK_LOST];
if (slave->master.isSome()) {
- Try<string> masterHostname = net::getHostname(slave->master.get().ip);
+ Try<string> masterHostname = net::getHostname(slave->master.get().node.ip);
if (masterHostname.isSome()) {
object.values["master_hostname"] = masterHostname.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 99fd055..275081c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -171,7 +171,7 @@ void Slave::initialize()
{
LOG(INFO) << "Slave started on " << string(self()).substr(6);
- if (stringify(net::IP(ntohl(self().ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Slave bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
@@ -302,7 +302,7 @@ void Slave::initialize()
string hostname;
if (flags.hostname.isNone()) {
- Try<string> result = net::getHostname(self().ip);
+ Try<string> result = net::getHostname(self().node.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
@@ -315,7 +315,7 @@ void Slave::initialize()
// Initialize slave info.
info.set_hostname(hostname);
- info.set_port(self().port);
+ info.set_port(self().node.port);
info.mutable_resources()->CopyFrom(resources.get());
info.mutable_attributes()->CopyFrom(attributes);
info.set_checkpoint(flags.checkpoint);
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index a01eec1..19aee31 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -331,8 +331,8 @@ TEST_F(FetcherTest, OSNetUriTest)
spawn(process);
- string url = "http://" + net::getHostname(process.self().ip).get() +
- ":" + stringify(process.self().port) + "/help";
+ string url = "http://" + net::getHostname(process.self().node.ip).get() +
+ ":" + stringify(process.self().node.port) + "/help";
string localFile = path::join(os::getcwd(), "help");
EXPECT_FALSE(os::exists(localFile));
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/files_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/files_tests.cpp b/src/tests/files_tests.cpp
index a696aa2..9ad6db5 100644
--- a/src/tests/files_tests.cpp
+++ b/src/tests/files_tests.cpp
@@ -84,7 +84,7 @@ TEST_F(FilesTest, DetachTest)
TEST_F(FilesTest, ReadTest)
{
Files files;
- process::UPID upid("files", process::ip(), process::port());
+ process::UPID upid("files", process::node());
Future<Response> response =
process::http::get(upid, "read.json");
@@ -138,7 +138,7 @@ TEST_F(FilesTest, ReadTest)
TEST_F(FilesTest, ResolveTest)
{
Files files;
- process::UPID upid("files", process::ip(), process::port());
+ process::UPID upid("files", process::node());
// Test the directory / file resolution.
ASSERT_SOME(os::mkdir("1/2"));
@@ -214,7 +214,7 @@ TEST_F(FilesTest, ResolveTest)
TEST_F(FilesTest, BrowseTest)
{
Files files;
- process::UPID upid("files", process::ip(), process::port());
+ process::UPID upid("files", process::node());
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
@@ -267,7 +267,7 @@ TEST_F(FilesTest, BrowseTest)
TEST_F(FilesTest, DownloadTest)
{
Files files;
- process::UPID upid("files", process::ip(), process::port());
+ process::UPID upid("files", process::node());
// This is a one-pixel black gif image.
const unsigned char gifData[] = {
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index f7747e2..8618ae1 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -458,7 +458,7 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedFramework)
ASSERT_FALSE(os::exists(frameworkDir));
- process::UPID filesUpid("files", process::ip(), process::port());
+ process::UPID filesUpid("files", process::node());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(filesUpid, "browse.json", "path=" + frameworkDir));
@@ -559,7 +559,7 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
- process::UPID files("files", process::ip(), process::port());
+ process::UPID files("files", process::node());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(files, "browse.json", "path=" + executorDir));
@@ -674,7 +674,7 @@ TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
- process::UPID files("files", process::ip(), process::port());
+ process::UPID files("files", process::node());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(files, "browse.json", "path=" + executorDir));
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/logging_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/logging_tests.cpp b/src/tests/logging_tests.cpp
index 58e9b33..22a785e 100644
--- a/src/tests/logging_tests.cpp
+++ b/src/tests/logging_tests.cpp
@@ -37,8 +37,7 @@ TEST(LoggingTest, Toggle)
{
process::PID<> pid;
pid.id = "logging";
- pid.ip = process::ip();
- pid.port = process::port();
+ pid.node = process::node();
process::Future<Response> response = process::http::get(pid, "toggle");
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index 8f7fb18..d847a30 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -130,8 +130,8 @@ TEST_F(MasterContenderDetectorTest, File)
TEST(BasicMasterContenderDetectorTest, Contender)
{
PID<Master> master;
- master.ip = 10000000;
- master.port = 10000;
+ master.node.ip = 10000000;
+ master.node.port = 10000;
MasterContender* contender = new StandaloneMasterContender();
@@ -155,8 +155,8 @@ TEST(BasicMasterContenderDetectorTest, Contender)
TEST(BasicMasterContenderDetectorTest, Detector)
{
PID<Master> master;
- master.ip = 10000000;
- master.port = 10000;
+ master.node.ip = 10000000;
+ master.node.port = 10000;
StandaloneMasterDetector detector;
@@ -199,8 +199,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group);
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo master = internal::protobuf::createMasterInfo(pid);
contender->initialize(master);
@@ -257,8 +257,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
ZooKeeperMasterContender contender(url.get());
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo master = internal::protobuf::createMasterInfo(pid);
contender.initialize(master);
@@ -311,8 +311,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
new ZooKeeperMasterContender(url.get());
PID<Master> pid1;
- pid1.ip = 10000000;
- pid1.port = 10000;
+ pid1.node.ip = 10000000;
+ pid1.node.port = 10000;
MasterInfo master1 = internal::protobuf::createMasterInfo(pid1);
contender1->initialize(master1);
@@ -329,8 +329,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
ZooKeeperMasterContender contender2(url.get());
PID<Master> pid2;
- pid2.ip = 10000001;
- pid2.port = 10001;
+ pid2.node.ip = 10000001;
+ pid2.node.port = 10001;
MasterInfo master2 = internal::protobuf::createMasterInfo(pid2);
contender2.initialize(master2);
@@ -368,8 +368,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
AWAIT_READY(group1.join("data"));
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo master = internal::protobuf::createMasterInfo(pid);
// group2's password is wrong and operations on it will fail.
@@ -432,8 +432,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
ZooKeeperMasterContender contender(url.get());
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo master = internal::protobuf::createMasterInfo(pid);
contender.initialize(master);
@@ -508,8 +508,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
ZooKeeperMasterContender leaderContender(leaderGroup);
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo leader = internal::protobuf::createMasterInfo(pid);
leaderContender.initialize(leader);
@@ -529,8 +529,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
ZooKeeperMasterContender followerContender(followerGroup);
PID<Master> pid2;
- pid2.ip = 10000001;
- pid2.port = 10001;
+ pid2.node.ip = 10000001;
+ pid2.node.port = 10001;
MasterInfo follower = internal::protobuf::createMasterInfo(pid2);
followerContender.initialize(follower);
@@ -618,8 +618,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ASSERT_SOME(url);
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo leader = internal::protobuf::createMasterInfo(pid);
// Create the group instance so we can expire its session.
@@ -647,8 +647,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
// Simulate a following master.
PID<Master> pid2;
- pid2.ip = 10000001;
- pid2.port = 10001;
+ pid2.node.ip = 10000001;
+ pid2.node.port = 10001;
MasterInfo follower = internal::protobuf::createMasterInfo(pid2);
ZooKeeperMasterDetector followerDetector(url.get());
@@ -694,8 +694,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
ASSERT_SOME(url);
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo master = internal::protobuf::createMasterInfo(pid);
ZooKeeperMasterContender masterContender(url.get());
@@ -755,8 +755,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector leaderDetector(leaderGroup);
PID<Master> pid;
- pid.ip = 10000000;
- pid.port = 10000;
+ pid.node.ip = 10000000;
+ pid.node.port = 10000;
MasterInfo leader = internal::protobuf::createMasterInfo(pid);
leaderContender.initialize(leader);
@@ -775,8 +775,8 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector followerDetector(followerGroup);
PID<Master> pid2;
- pid2.ip = 10000001;
- pid2.port = 10001;
+ pid2.node.ip = 10000001;
+ pid2.node.port = 10001;
MasterInfo follower = internal::protobuf::createMasterInfo(pid2);
followerContender.initialize(follower);
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index a6d1a4a..66423a9 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -964,8 +964,8 @@ TEST_F(MasterTest, MasterInfo)
driver.start();
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().port, masterInfo.get().port());
- EXPECT_EQ(master.get().ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().node.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
driver.stop();
driver.join();
@@ -1022,8 +1022,8 @@ TEST_F(MasterTest, MasterInfoOnReElection)
AWAIT_READY(disconnected);
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().port, masterInfo.get().port());
- EXPECT_EQ(master.get().ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().node.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
// The re-registered framework should get offers.
AWAIT_READY(resourceOffers2);
@@ -2138,8 +2138,8 @@ TEST_F(MasterTest, MaxExecutorsPerSlave)
driver.start();
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().port, masterInfo.get().port());
- EXPECT_EQ(master.get().ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().node.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/76bfb493/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 4b950e1..3b02619 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -219,7 +219,7 @@ TEST(MonitorTest, Statistics)
// Now wait for ResouorceMonitorProcess::watch to finish.
process::Clock::settle();
- process::UPID upid("monitor", process::ip(), process::port());
+ process::UPID upid("monitor", process::node());
// Request the statistics, this will ask the isolator.
Future<Response> response = process::http::get(upid, "statistics.json");