You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/08/01 21:03:03 UTC

[05/16] mesos git commit: Cached a reference to a ProcessBase in every UPID.

Cached a reference to a ProcessBase in every UPID.

Review: https://reviews.apache.org/r/61060


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c70a5647
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c70a5647
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c70a5647

Branch: refs/heads/master
Commit: c70a564731aec2f409729df89c1bc4b668a74760
Parents: 3924617
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 21 17:56:17 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Aug 1 14:01:51 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/pid.hpp     | 37 +++++++++--
 3rdparty/libprocess/include/process/process.hpp |  8 +--
 3rdparty/libprocess/src/pid.cpp                 |  9 +--
 3rdparty/libprocess/src/process.cpp             | 47 +++++++++++---
 3rdparty/libprocess/src/process_reference.hpp   | 65 ++++----------------
 5 files changed, 89 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 6ed936d..88ba64e 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -40,20 +40,21 @@ struct UPID
 {
   UPID() = default;
 
-  UPID(const UPID& that)
-    : id(that.id), address(that.address), addresses(that.addresses) {}
+  UPID(const UPID& that) = default;
+
+  UPID(UPID&& that) = default;
 
   UPID(const char* id_, const net::IP& ip_, uint16_t port_)
-    : id(id_), address(ip_, port_) {}
+    : id(id_), address(ip_, port_) { resolve(); }
 
   UPID(const char* id_, const network::inet::Address& address_)
-    : id(id_), address(address_) {}
+    : id(id_), address(address_) { resolve(); }
 
   UPID(const std::string& id_, const net::IP& ip_, uint16_t port_)
-    : id(id_), address(ip_, port_) {}
+    : id(id_), address(ip_, port_) { resolve(); }
 
   UPID(const std::string& id_, const network::inet::Address& address_)
-    : id(id_), address(address_) {}
+    : id(id_), address(address_) { resolve(); }
 
   /*implicit*/ UPID(const char* s);
 
@@ -61,6 +62,10 @@ struct UPID
 
   /*implicit*/ UPID(const ProcessBase& process);
 
+  UPID& operator=(const UPID& that) = default;
+
+  UPID& operator=(UPID&& that) = default;
+
   operator std::string() const;
 
   operator bool() const
@@ -92,6 +97,10 @@ struct UPID
     return !(*this == that);
   }
 
+  // Attempts to resolve and cache a weak pointer to the ProcessBase
+  // to which this UPID refers.
+  void resolve();
+
   std::string id;
 
   // TODO(asridharan): Ideally, the following `address` field should be of
@@ -118,6 +127,17 @@ struct UPID
   {
     Option<network::inet6::Address> v6;
   } addresses = {None()};
+
+protected:
+  friend class ProcessBase;
+  friend class ProcessManager;
+
+  // A weak pointer to the actual process used to optimize enqueuing
+  // events without having to go through a shared lock in the
+  // `ProcessManager`. This is `None` if someone creates a UPID and
+  // doesn't call `resolve()` or if `resolve()` doesn't find a valid
+  // process (i.e., the process hasn't started or has terminated).
+  Option<std::weak_ptr<ProcessBase*>> reference = None();
 };
 
 
@@ -150,6 +170,10 @@ struct UPID
 template <typename T = ProcessBase>
 struct PID : UPID
 {
+  // Need to declare PID<U> as a friend in order to write `reference`.
+  template <typename U>
+  friend class PID;
+
   PID() : UPID() {}
 
   /*implicit*/ PID(const T* t) : UPID(static_cast<const ProcessBase&>(*t)) {}
@@ -166,6 +190,7 @@ struct PID : UPID
     pid.id = id;
     pid.address = address;
     pid.addresses = addresses;
+    pid.reference = reference;
     return pid;
   }
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 9a41ed5..42cd931 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -76,7 +76,7 @@ public:
 
   virtual ~ProcessBase();
 
-  UPID self() const { return pid; }
+  const UPID& self() const { return pid; }
 
 protected:
   /**
@@ -379,7 +379,6 @@ protected:
 private:
   friend class SocketManager;
   friend class ProcessManager;
-  friend class ProcessReference;
   friend void* schedule(void*);
 
   // Process states.
@@ -464,8 +463,9 @@ private:
   // a pointer so we can hide the implementation of `EventQueue`.
   std::unique_ptr<EventQueue> events;
 
-  // Active references.
-  std::atomic_long refs;
+  // NOTE: this is a shared pointer to a _pointer_, hence this is not
+  // responsible for the ProcessBase itself.
+  std::shared_ptr<ProcessBase*> reference;
 
   std::shared_ptr<Gate> gate;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index 634ac44..fba6116 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -58,12 +58,7 @@ UPID::UPID(const string& s)
 
 
 // TODO(benh): Make this inline-able (cyclic dependency issues).
-UPID::UPID(const ProcessBase& process)
-{
-  id = process.self().id;
-  address = process.self().address;
-  addresses = process.self().addresses;
-}
+UPID::UPID(const ProcessBase& process) : UPID(process.self()) {}
 
 
 UPID::operator string() const
@@ -145,6 +140,8 @@ istream& operator>>(istream& stream, UPID& pid)
   pid.id = id;
   pid.address = address;
 
+  pid.resolve();
+
   return stream;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d6b8db3..59068d5 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2851,19 +2851,22 @@ long ProcessManager::init_threads()
 
 ProcessReference ProcessManager::use(const UPID& pid)
 {
+  if (pid.reference.isSome()) {
+    if (std::shared_ptr<ProcessBase*> reference = pid.reference->lock()) {
+      return ProcessReference(std::move(reference));
+    }
+  }
+
   if (pid.address == __address__) {
     synchronized (processes_mutex) {
       Option<ProcessBase*> process = processes.get(pid.id);
       if (process.isSome()) {
-        // Note that the ProcessReference constructor _must_ get
-        // called while holding the lock on processes so that waiting
-        // for references is atomic (i.e., race free).
-        return ProcessReference(process.get());
+        return ProcessReference(process.get()->reference);
       }
     }
   }
 
-  return ProcessReference(nullptr);
+  return ProcessReference();
 }
 
 
@@ -3120,6 +3123,7 @@ bool ProcessManager::deliver(
   if (ProcessReference receiver = use(to)) {
     return deliver(receiver, event, sender);
   }
+
   VLOG(2) << "Dropping event for process " << to;
 
   delete event;
@@ -3147,9 +3151,21 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 
   synchronized (processes_mutex) {
     if (processes.count(process->pid.id) > 0) {
+      VLOG(1) << "Attempting to spawn already spawned process " << process->pid;
       return UPID();
     } else {
       processes[process->pid.id] = process;
+
+      // NOTE: we set process reference on it's `UPID` _after_ we've
+      // spawned so that we make sure that we'll take the
+      // `ProcessManager::use()` code path in the event that we aren't
+      // able to spawn the process. This is important in circumstances
+      // where there are multiple processes with the same ID because
+      // the semantics that people have come to expect from libprocess
+      // is that a `UPID` should "resolve" to the already spawned
+      // process rather than a process that has the same name but
+      // hasn't yet been spawned.
+      process->pid.reference = process->reference;
     }
   }
 
@@ -3329,8 +3345,16 @@ void ProcessManager::cleanup(ProcessBase* process)
 
   // Remove process.
   synchronized (processes_mutex) {
+    // Reset the reference so that we don't keep giving out references
+    // in `ProcessManager::use`.
+    //
+    // NOTE: this must be done from within the `processes_mutex` since
+    // that is where we read it and this is considered a write.
+    process->reference.reset();
+
     // Wait for all process references to get cleaned up.
-    while (process->refs.load() > 0) {
+    CHECK_SOME(process->pid.reference);
+    while (!process->pid.reference->expired()) {
 #if defined(__i386__) || defined(__x86_64__)
       asm ("pause");
 #endif
@@ -3667,7 +3691,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
 
 ProcessBase::ProcessBase(const string& id)
   : events(new EventQueue()),
-    refs(0),
+    reference(std::make_shared<ProcessBase*>(this)),
     gate(std::make_shared<Gate>())
 {
   process::initialize();
@@ -4079,6 +4103,15 @@ ProcessBase:: operator JSON::Object()
 }
 
 
+void UPID::resolve()
+{
+  if (ProcessReference process = process_manager->use(*this)) {
+    reference = process.reference;
+  }
+  // Otherwise keep it `None` to force look ups in the future!
+}
+
+
 UPID spawn(ProcessBase* process, bool manage)
 {
   process::initialize();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/process_reference.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process_reference.hpp b/3rdparty/libprocess/src/process_reference.hpp
index a6d6d20..d99e9f1 100644
--- a/3rdparty/libprocess/src/process_reference.hpp
+++ b/3rdparty/libprocess/src/process_reference.hpp
@@ -17,78 +17,35 @@
 
 namespace process {
 
-// Provides reference counting semantics for a process pointer.
 class ProcessReference
 {
 public:
-  ProcessReference() : process(nullptr) {}
+  ProcessReference() = default;
 
-  ~ProcessReference()
-  {
-    cleanup();
-  }
+  ProcessReference(std::shared_ptr<ProcessBase*>&& reference)
+    : reference(std::move(reference)) {}
 
-  ProcessReference(const ProcessReference& that)
-  {
-    copy(that);
-  }
-
-  ProcessReference& operator=(const ProcessReference& that)
-  {
-    if (this != &that) {
-      cleanup();
-      copy(that);
-    }
-    return *this;
-  }
+  ProcessReference(const std::shared_ptr<ProcessBase*>& reference)
+    : reference(reference) {}
 
   ProcessBase* operator->() const
   {
-    return process;
+    CHECK(reference);
+    return *reference;
   }
 
   operator ProcessBase*() const
   {
-    return process;
+    CHECK(reference);
+    return *reference;
   }
 
   operator bool() const
   {
-    return process != nullptr;
-  }
-
-private:
-  friend class ProcessManager; // For ProcessManager::use.
-
-  explicit ProcessReference(ProcessBase* _process)
-    : process(_process)
-  {
-    if (process != nullptr) {
-      process->refs.fetch_add(1);
-    }
-  }
-
-  void copy(const ProcessReference& that)
-  {
-    process = that.process;
-
-    if (process != nullptr) {
-      // There should be at least one reference to the process, so
-      // we don't need to worry about checking if it's exiting or
-      // not, since we know we can always create another reference.
-      CHECK(process->refs.load() > 0);
-      process->refs.fetch_add(1);
-    }
-  }
-
-  void cleanup()
-  {
-    if (process != nullptr) {
-      process->refs.fetch_sub(1);
-    }
+    return (bool) reference;
   }
 
-  ProcessBase* process;
+  std::shared_ptr<ProcessBase*> reference;
 };
 
 } // namespace process {