You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/08/01 21:03:03 UTC
[05/16] mesos git commit: Cached a reference to a ProcessBase in
every UPID.
Cached a reference to a ProcessBase in every UPID.
Review: https://reviews.apache.org/r/61060
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c70a5647
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c70a5647
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c70a5647
Branch: refs/heads/master
Commit: c70a564731aec2f409729df89c1bc4b668a74760
Parents: 3924617
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 21 17:56:17 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Aug 1 14:01:51 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/pid.hpp | 37 +++++++++--
3rdparty/libprocess/include/process/process.hpp | 8 +--
3rdparty/libprocess/src/pid.cpp | 9 +--
3rdparty/libprocess/src/process.cpp | 47 +++++++++++---
3rdparty/libprocess/src/process_reference.hpp | 65 ++++----------------
5 files changed, 89 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 6ed936d..88ba64e 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -40,20 +40,21 @@ struct UPID
{
UPID() = default;
- UPID(const UPID& that)
- : id(that.id), address(that.address), addresses(that.addresses) {}
+ UPID(const UPID& that) = default;
+
+ UPID(UPID&& that) = default;
UPID(const char* id_, const net::IP& ip_, uint16_t port_)
- : id(id_), address(ip_, port_) {}
+ : id(id_), address(ip_, port_) { resolve(); }
UPID(const char* id_, const network::inet::Address& address_)
- : id(id_), address(address_) {}
+ : id(id_), address(address_) { resolve(); }
UPID(const std::string& id_, const net::IP& ip_, uint16_t port_)
- : id(id_), address(ip_, port_) {}
+ : id(id_), address(ip_, port_) { resolve(); }
UPID(const std::string& id_, const network::inet::Address& address_)
- : id(id_), address(address_) {}
+ : id(id_), address(address_) { resolve(); }
/*implicit*/ UPID(const char* s);
@@ -61,6 +62,10 @@ struct UPID
/*implicit*/ UPID(const ProcessBase& process);
+ UPID& operator=(const UPID& that) = default;
+
+ UPID& operator=(UPID&& that) = default;
+
operator std::string() const;
operator bool() const
@@ -92,6 +97,10 @@ struct UPID
return !(*this == that);
}
+ // Attempts to resolve and cache a weak pointer to the ProcessBase
+ // to which this UPID refers.
+ void resolve();
+
std::string id;
// TODO(asridharan): Ideally, the following `address` field should be of
@@ -118,6 +127,17 @@ struct UPID
{
Option<network::inet6::Address> v6;
} addresses = {None()};
+
+protected:
+ friend class ProcessBase;
+ friend class ProcessManager;
+
+ // A weak pointer to the actual process used to optimize enqueuing
+ // events without having to go through a shared lock in the
+ // `ProcessManager`. This is `None` if someone creates a UPID and
+ // doesn't call `resolve()` or if `resolve()` doesn't find a valid
+ // process (i.e., the process hasn't started or has terminated).
+ Option<std::weak_ptr<ProcessBase*>> reference = None();
};
@@ -150,6 +170,10 @@ struct UPID
template <typename T = ProcessBase>
struct PID : UPID
{
+ // Need to declare PID<U> as a friend in order to write `reference`.
+ template <typename U>
+ friend class PID;
+
PID() : UPID() {}
/*implicit*/ PID(const T* t) : UPID(static_cast<const ProcessBase&>(*t)) {}
@@ -166,6 +190,7 @@ struct PID : UPID
pid.id = id;
pid.address = address;
pid.addresses = addresses;
+ pid.reference = reference;
return pid;
}
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 9a41ed5..42cd931 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -76,7 +76,7 @@ public:
virtual ~ProcessBase();
- UPID self() const { return pid; }
+ const UPID& self() const { return pid; }
protected:
/**
@@ -379,7 +379,6 @@ protected:
private:
friend class SocketManager;
friend class ProcessManager;
- friend class ProcessReference;
friend void* schedule(void*);
// Process states.
@@ -464,8 +463,9 @@ private:
// a pointer so we can hide the implementation of `EventQueue`.
std::unique_ptr<EventQueue> events;
- // Active references.
- std::atomic_long refs;
+ // NOTE: this is a shared pointer to a _pointer_, hence this is not
+ // responsible for the ProcessBase itself.
+ std::shared_ptr<ProcessBase*> reference;
std::shared_ptr<Gate> gate;
http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index 634ac44..fba6116 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -58,12 +58,7 @@ UPID::UPID(const string& s)
// TODO(benh): Make this inline-able (cyclic dependency issues).
-UPID::UPID(const ProcessBase& process)
-{
- id = process.self().id;
- address = process.self().address;
- addresses = process.self().addresses;
-}
+UPID::UPID(const ProcessBase& process) : UPID(process.self()) {}
UPID::operator string() const
@@ -145,6 +140,8 @@ istream& operator>>(istream& stream, UPID& pid)
pid.id = id;
pid.address = address;
+ pid.resolve();
+
return stream;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d6b8db3..59068d5 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2851,19 +2851,22 @@ long ProcessManager::init_threads()
ProcessReference ProcessManager::use(const UPID& pid)
{
+ if (pid.reference.isSome()) {
+ if (std::shared_ptr<ProcessBase*> reference = pid.reference->lock()) {
+ return ProcessReference(std::move(reference));
+ }
+ }
+
if (pid.address == __address__) {
synchronized (processes_mutex) {
Option<ProcessBase*> process = processes.get(pid.id);
if (process.isSome()) {
- // Note that the ProcessReference constructor _must_ get
- // called while holding the lock on processes so that waiting
- // for references is atomic (i.e., race free).
- return ProcessReference(process.get());
+ return ProcessReference(process.get()->reference);
}
}
}
- return ProcessReference(nullptr);
+ return ProcessReference();
}
@@ -3120,6 +3123,7 @@ bool ProcessManager::deliver(
if (ProcessReference receiver = use(to)) {
return deliver(receiver, event, sender);
}
+
VLOG(2) << "Dropping event for process " << to;
delete event;
@@ -3147,9 +3151,21 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
synchronized (processes_mutex) {
if (processes.count(process->pid.id) > 0) {
+ VLOG(1) << "Attempting to spawn already spawned process " << process->pid;
return UPID();
} else {
processes[process->pid.id] = process;
+
+ // NOTE: we set process reference on it's `UPID` _after_ we've
+ // spawned so that we make sure that we'll take the
+ // `ProcessManager::use()` code path in the event that we aren't
+ // able to spawn the process. This is important in circumstances
+ // where there are multiple processes with the same ID because
+ // the semantics that people have come to expect from libprocess
+ // is that a `UPID` should "resolve" to the already spawned
+ // process rather than a process that has the same name but
+ // hasn't yet been spawned.
+ process->pid.reference = process->reference;
}
}
@@ -3329,8 +3345,16 @@ void ProcessManager::cleanup(ProcessBase* process)
// Remove process.
synchronized (processes_mutex) {
+ // Reset the reference so that we don't keep giving out references
+ // in `ProcessManager::use`.
+ //
+ // NOTE: this must be done from within the `processes_mutex` since
+ // that is where we read it and this is considered a write.
+ process->reference.reset();
+
// Wait for all process references to get cleaned up.
- while (process->refs.load() > 0) {
+ CHECK_SOME(process->pid.reference);
+ while (!process->pid.reference->expired()) {
#if defined(__i386__) || defined(__x86_64__)
asm ("pause");
#endif
@@ -3667,7 +3691,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
ProcessBase::ProcessBase(const string& id)
: events(new EventQueue()),
- refs(0),
+ reference(std::make_shared<ProcessBase*>(this)),
gate(std::make_shared<Gate>())
{
process::initialize();
@@ -4079,6 +4103,15 @@ ProcessBase:: operator JSON::Object()
}
+void UPID::resolve()
+{
+ if (ProcessReference process = process_manager->use(*this)) {
+ reference = process.reference;
+ }
+ // Otherwise keep it `None` to force look ups in the future!
+}
+
+
UPID spawn(ProcessBase* process, bool manage)
{
process::initialize();
http://git-wip-us.apache.org/repos/asf/mesos/blob/c70a5647/3rdparty/libprocess/src/process_reference.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process_reference.hpp b/3rdparty/libprocess/src/process_reference.hpp
index a6d6d20..d99e9f1 100644
--- a/3rdparty/libprocess/src/process_reference.hpp
+++ b/3rdparty/libprocess/src/process_reference.hpp
@@ -17,78 +17,35 @@
namespace process {
-// Provides reference counting semantics for a process pointer.
class ProcessReference
{
public:
- ProcessReference() : process(nullptr) {}
+ ProcessReference() = default;
- ~ProcessReference()
- {
- cleanup();
- }
+ ProcessReference(std::shared_ptr<ProcessBase*>&& reference)
+ : reference(std::move(reference)) {}
- ProcessReference(const ProcessReference& that)
- {
- copy(that);
- }
-
- ProcessReference& operator=(const ProcessReference& that)
- {
- if (this != &that) {
- cleanup();
- copy(that);
- }
- return *this;
- }
+ ProcessReference(const std::shared_ptr<ProcessBase*>& reference)
+ : reference(reference) {}
ProcessBase* operator->() const
{
- return process;
+ CHECK(reference);
+ return *reference;
}
operator ProcessBase*() const
{
- return process;
+ CHECK(reference);
+ return *reference;
}
operator bool() const
{
- return process != nullptr;
- }
-
-private:
- friend class ProcessManager; // For ProcessManager::use.
-
- explicit ProcessReference(ProcessBase* _process)
- : process(_process)
- {
- if (process != nullptr) {
- process->refs.fetch_add(1);
- }
- }
-
- void copy(const ProcessReference& that)
- {
- process = that.process;
-
- if (process != nullptr) {
- // There should be at least one reference to the process, so
- // we don't need to worry about checking if it's exiting or
- // not, since we know we can always create another reference.
- CHECK(process->refs.load() > 0);
- process->refs.fetch_add(1);
- }
- }
-
- void cleanup()
- {
- if (process != nullptr) {
- process->refs.fetch_sub(1);
- }
+ return (bool) reference;
}
- ProcessBase* process;
+ std::shared_ptr<ProcessBase*> reference;
};
} // namespace process {