You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/12/09 23:07:05 UTC
[1/2] mesos git commit: Fixed a long-standing performance issue in
libprocess' SocketManager.
Repository: mesos
Updated Branches:
refs/heads/master e6be44560 -> f8fc9d0a9
Fixed a long-standing performance issue in libprocess' SocketManager.
Review: https://reviews.apache.org/r/28838
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f8fc9d0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f8fc9d0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f8fc9d0a
Branch: refs/heads/master
Commit: f8fc9d0a901777ecf648c0a4991b847104a63b0d
Parents: f153718
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 8 22:18:25 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Dec 9 14:02:39 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 91 +++++++++++++++++++++++---------
1 file changed, 67 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f8fc9d0a/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b87ac22..0b2a2c1 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -287,8 +287,18 @@ public:
void exited(ProcessBase* process);
private:
- // Map from UPID (local/remote) to process.
- map<UPID, set<ProcessBase*> > links;
+ // TODO(bmahler): Leverage a bidirectional multimap instead, or
+ // hide the complexity of manipulating 'links' through methods.
+ struct
+ {
+ // For links, we maintain a bidirectional mapping between the
+ // "linkers" (Processes) and the "linkees" (remote / local UPIDs).
+ // For remote nodes, we also need a mapping to the linkees on the
+ // node, because socket closure only notifies at the node level.
+ hashmap<UPID, hashset<ProcessBase*>> linkers;
+ hashmap<ProcessBase*, hashset<UPID>> linkees;
+ hashmap<Node, hashset<UPID>> remotes;
+ } links;
// Collection of all actice sockets.
map<int, Socket> sockets;
@@ -1559,7 +1569,11 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
bool connect = false;
synchronized (this) {
- links[to].insert(process);
+ links.linkers[to].insert(process);
+ links.linkees[process].insert(to);
+ if (to.node != __node__) {
+ links.remotes[to.node].insert(to);
+ }
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
@@ -2016,20 +2030,30 @@ void SocketManager::exited(const Node& node)
// ourselves that the accesses to each Process object will always be
// valid.
synchronized (this) {
- list<UPID> removed;
- // Look up all linked processes.
- foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
- if (linkee.node == node) {
- foreach (ProcessBase* linker, processes) {
- linker->enqueue(new ExitedEvent(linkee));
+ if (!links.remotes.contains(node)) {
+ return; // No linkees for this node!
+ }
+
+ foreach (const UPID& linkee, links.remotes[node]) {
+ // Find and notify the linkers.
+ CHECK(links.linkers.contains(linkee));
+
+ foreach (ProcessBase* linker, links.linkers[linkee]) {
+ linker->enqueue(new ExitedEvent(linkee));
+
+ // Remove the linkee pid from the linker.
+ CHECK(links.linkees.contains(linker));
+
+ links.linkees[linker].erase(linkee);
+ if (links.linkees[linker].empty()) {
+ links.linkees.erase(linker);
}
- removed.push_back(linkee);
}
- }
- foreach (const UPID& pid, removed) {
- links.erase(pid);
+ links.linkers.erase(linkee);
}
+
+ links.remotes.erase(node);
}
}
@@ -2047,21 +2071,40 @@ void SocketManager::exited(ProcessBase* process)
const Time time = Clock::now(process);
synchronized (this) {
- // Iterate through the links, removing any links the process might
- // have had and creating exited events for any linked processes.
- foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
- processes.erase(process);
-
- if (linkee == pid) {
- foreach (ProcessBase* linker, processes) {
- CHECK(linker != process) << "Process linked with itself";
- Clock::update(linker, time);
- linker->enqueue(new ExitedEvent(linkee));
+ // If this process had linked to anything, we need to clean
+ // up any pointers to it.
+ if (links.linkees.contains(process)) {
+ foreach (const UPID& linkee, links.linkees[process]) {
+ CHECK(links.linkers.contains(linkee));
+
+ links.linkers[linkee].erase(process);
+ if (links.linkers[linkee].empty()) {
+ links.linkers.erase(linkee);
}
}
+ links.linkees.erase(process);
+ }
+
+ // Find the linkers to notify.
+ if (!links.linkers.contains(pid)) {
+ return; // No linkers for this process!
+ }
+
+ foreach (ProcessBase* linker, links.linkers[pid]) {
+ CHECK(linker != process) << "Process linked with itself";
+ Clock::update(linker, time);
+ linker->enqueue(new ExitedEvent(pid));
+
+ // Remove the linkee pid from the linker.
+ CHECK(links.linkees.contains(linker));
+
+ links.linkees[linker].erase(pid);
+ if (links.linkees[linker].empty()) {
+ links.linkees.erase(linker);
+ }
}
- links.erase(pid);
+ links.linkers.erase(pid);
}
}
[2/2] mesos git commit: Added a hash function for Node.
Posted by bm...@apache.org.
Added a hash function for Node.
Review: https://reviews.apache.org/r/28869
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f1537180
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f1537180
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f1537180
Branch: refs/heads/master
Commit: f1537180747774bbb2e02062f89614be208327d7
Parents: e6be445
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Dec 9 12:41:33 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Dec 9 14:02:39 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/node.hpp | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f1537180/3rdparty/libprocess/include/process/node.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp
index eb48c54..173eb8a 100644
--- a/3rdparty/libprocess/include/process/node.hpp
+++ b/3rdparty/libprocess/include/process/node.hpp
@@ -1,13 +1,17 @@
#ifndef __PROCESS_NODE_HPP__
#define __PROCESS_NODE_HPP__
-#include <arpa/inet.h>
+#include <stdint.h>
#include <unistd.h>
-#include <sstream>
+#include <arpa/inet.h>
#include <glog/logging.h>
+#include <sstream>
+
+#include <boost/functional/hash.hpp>
+
namespace process {
// Represents a remote "node" (encapsulates IP address and port).
@@ -41,6 +45,7 @@ public:
uint16_t port;
};
+
inline std::ostream& operator << (std::ostream& stream, const Node& node)
{
char ip[INET_ADDRSTRLEN];
@@ -52,6 +57,16 @@ inline std::ostream& operator << (std::ostream& stream, const Node& node)
return stream;
}
+
+// UPID hash value (for example, to use in Boost's unordered maps).
+inline std::size_t hash_value(const Node& node)
+{
+ size_t seed = 0;
+ boost::hash_combine(seed, node.ip);
+ boost::hash_combine(seed, node.port);
+ return seed;
+}
+
} // namespace process {
#endif // __PROCESS_NODE_HPP__