You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/12/09 23:07:05 UTC

[1/2] mesos git commit: Fixed a long-standing performance issue in libprocess' SocketManager.

Repository: mesos
Updated Branches:
  refs/heads/master e6be44560 -> f8fc9d0a9


Fixed a long-standing performance issue in libprocess' SocketManager.

Review: https://reviews.apache.org/r/28838


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f8fc9d0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f8fc9d0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f8fc9d0a

Branch: refs/heads/master
Commit: f8fc9d0a901777ecf648c0a4991b847104a63b0d
Parents: f153718
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 8 22:18:25 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Dec 9 14:02:39 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 91 +++++++++++++++++++++++---------
 1 file changed, 67 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f8fc9d0a/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b87ac22..0b2a2c1 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -287,8 +287,18 @@ public:
   void exited(ProcessBase* process);
 
 private:
-  // Map from UPID (local/remote) to process.
-  map<UPID, set<ProcessBase*> > links;
+  // TODO(bmahler): Leverage a bidirectional multimap instead, or
+  // hide the complexity of manipulating 'links' through methods.
+  struct
+  {
+    // For links, we maintain a bidirectional mapping between the
+    // "linkers" (Processes) and the "linkees" (remote / local UPIDs).
+    // For remote nodes, we also need a mapping to the linkees on the
+    // node, because socket closure only notifies at the node level.
+    hashmap<UPID, hashset<ProcessBase*>> linkers;
+    hashmap<ProcessBase*, hashset<UPID>> linkees;
+    hashmap<Node, hashset<UPID>> remotes;
+  } links;
 
   // Collection of all actice sockets.
   map<int, Socket> sockets;
@@ -1559,7 +1569,11 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
   bool connect = false;
 
   synchronized (this) {
-    links[to].insert(process);
+    links.linkers[to].insert(process);
+    links.linkees[process].insert(to);
+    if (to.node != __node__) {
+      links.remotes[to.node].insert(to);
+    }
 
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
@@ -2016,20 +2030,30 @@ void SocketManager::exited(const Node& node)
   // ourselves that the accesses to each Process object will always be
   // valid.
   synchronized (this) {
-    list<UPID> removed;
-    // Look up all linked processes.
-    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
-      if (linkee.node == node) {
-        foreach (ProcessBase* linker, processes) {
-          linker->enqueue(new ExitedEvent(linkee));
+    if (!links.remotes.contains(node)) {
+      return; // No linkees for this node!
+    }
+
+    foreach (const UPID& linkee, links.remotes[node]) {
+      // Find and notify the linkers.
+      CHECK(links.linkers.contains(linkee));
+
+      foreach (ProcessBase* linker, links.linkers[linkee]) {
+        linker->enqueue(new ExitedEvent(linkee));
+
+        // Remove the linkee pid from the linker.
+        CHECK(links.linkees.contains(linker));
+
+        links.linkees[linker].erase(linkee);
+        if (links.linkees[linker].empty()) {
+          links.linkees.erase(linker);
         }
-        removed.push_back(linkee);
       }
-    }
 
-    foreach (const UPID& pid, removed) {
-      links.erase(pid);
+      links.linkers.erase(linkee);
     }
+
+    links.remotes.erase(node);
   }
 }
 
@@ -2047,21 +2071,40 @@ void SocketManager::exited(ProcessBase* process)
   const Time time = Clock::now(process);
 
   synchronized (this) {
-    // Iterate through the links, removing any links the process might
-    // have had and creating exited events for any linked processes.
-    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
-      processes.erase(process);
-
-      if (linkee == pid) {
-        foreach (ProcessBase* linker, processes) {
-          CHECK(linker != process) << "Process linked with itself";
-          Clock::update(linker, time);
-          linker->enqueue(new ExitedEvent(linkee));
+    // If this process had linked to anything, we need to clean
+    // up any pointers to it.
+    if (links.linkees.contains(process)) {
+      foreach (const UPID& linkee, links.linkees[process]) {
+        CHECK(links.linkers.contains(linkee));
+
+        links.linkers[linkee].erase(process);
+        if (links.linkers[linkee].empty()) {
+          links.linkers.erase(linkee);
         }
       }
+      links.linkees.erase(process);
+    }
+
+    // Find the linkers to notify.
+    if (!links.linkers.contains(pid)) {
+      return; // No linkers for this process!
+    }
+
+    foreach (ProcessBase* linker, links.linkers[pid]) {
+      CHECK(linker != process) << "Process linked with itself";
+      Clock::update(linker, time);
+      linker->enqueue(new ExitedEvent(pid));
+
+      // Remove the linkee pid from the linker.
+      CHECK(links.linkees.contains(linker));
+
+      links.linkees[linker].erase(pid);
+      if (links.linkees[linker].empty()) {
+        links.linkees.erase(linker);
+      }
     }
 
-    links.erase(pid);
+    links.linkers.erase(pid);
   }
 }
 


[2/2] mesos git commit: Added a hash function for Node.

Posted by bm...@apache.org.
Added a hash function for Node.

Review: https://reviews.apache.org/r/28869


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f1537180
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f1537180
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f1537180

Branch: refs/heads/master
Commit: f1537180747774bbb2e02062f89614be208327d7
Parents: e6be445
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Dec 9 12:41:33 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Dec 9 14:02:39 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/node.hpp | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f1537180/3rdparty/libprocess/include/process/node.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp
index eb48c54..173eb8a 100644
--- a/3rdparty/libprocess/include/process/node.hpp
+++ b/3rdparty/libprocess/include/process/node.hpp
@@ -1,13 +1,17 @@
 #ifndef __PROCESS_NODE_HPP__
 #define __PROCESS_NODE_HPP__
 
-#include <arpa/inet.h>
+#include <stdint.h>
 #include <unistd.h>
 
-#include <sstream>
+#include <arpa/inet.h>
 
 #include <glog/logging.h>
 
+#include <sstream>
+
+#include <boost/functional/hash.hpp>
+
 namespace process {
 
 // Represents a remote "node" (encapsulates IP address and port).
@@ -41,6 +45,7 @@ public:
   uint16_t port;
 };
 
+
 inline std::ostream& operator << (std::ostream& stream, const Node& node)
 {
   char ip[INET_ADDRSTRLEN];
@@ -52,6 +57,16 @@ inline std::ostream& operator << (std::ostream& stream, const Node& node)
   return stream;
 }
 
+
+// UPID hash value (for example, to use in Boost's unordered maps).
+inline std::size_t hash_value(const Node& node)
+{
+  size_t seed = 0;
+  boost::hash_combine(seed, node.ip);
+  boost::hash_combine(seed, node.port);
+  return seed;
+}
+
 } // namespace process {
 
 #endif // __PROCESS_NODE_HPP__