You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/09/07 03:09:10 UTC
mesos git commit: Removed garbage collector.
Repository: mesos
Updated Branches:
refs/heads/master f07d3a7f5 -> e04d3cf81
Removed garbage collector.
The garbage collector had at least two bugs:
(1) If someone dispatched `manage()` twice in a row the process we're
waiting for will get overwritten which can wreak havoc depending
on when the calls to `link()` happen.
(2) The garbage collector was deleting after an exited event rather
than actually doing a `wait()`.
The simpler implementation that this patch introduces is to just
delete the process after doing `ProcessManager::cleanup()`.
Review: https://reviews.apache.org/r/62053
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e04d3cf8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e04d3cf8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e04d3cf8
Branch: refs/heads/master
Commit: e04d3cf81f542d4d808b0f7eb3247bd34fbc976e
Parents: f07d3a7
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Sep 2 08:00:12 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 6 20:08:55 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/Makefile.am | 1 -
3rdparty/libprocess/include/process/gc.hpp | 55 -----
3rdparty/libprocess/include/process/process.hpp | 8 +-
3rdparty/libprocess/src/process.cpp | 229 +++++++++----------
3rdparty/libprocess/src/tests/process_tests.cpp | 1 -
5 files changed, 109 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index c5dc0bb..94c7a72 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -29,7 +29,6 @@ nobase_include_HEADERS = \
process/filter.hpp \
process/firewall.hpp \
process/future.hpp \
- process/gc.hpp \
process/gmock.hpp \
process/grpc.hpp \
process/gtest.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/process/gc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gc.hpp b/3rdparty/libprocess/include/process/gc.hpp
deleted file mode 100644
index 603bb8b..0000000
--- a/3rdparty/libprocess/include/process/gc.hpp
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __PROCESS_GC_HPP__
-#define __PROCESS_GC_HPP__
-
-#include <map>
-
-#include <process/process.hpp>
-
-
-namespace process {
-
-class GarbageCollector : public Process<GarbageCollector>
-{
-public:
- GarbageCollector() : ProcessBase("__gc__") {}
- virtual ~GarbageCollector() {}
-
- template <typename T>
- void manage(const T* t)
- {
- const ProcessBase* process = t;
- if (process != nullptr) {
- processes[process->self()] = process;
- link(process->self());
- }
- }
-
-protected:
- virtual void exited(const UPID& pid)
- {
- if (processes.count(pid) > 0) {
- const ProcessBase* process = processes[pid];
- processes.erase(pid);
- delete process;
- }
- }
-
-private:
- std::map<UPID, const ProcessBase*> processes;
-};
-
-} // namespace process {
-
-#endif // __PROCESS_GC_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 8cca782..dc3375c 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -475,6 +475,11 @@ private:
std::shared_ptr<Gate> gate;
+ // Whether or not the runtime should delete this process after it
+ // has terminated. Note that failure to spawn the process will leave
+ // the process unmanaged and thus it may leak!
+ bool manage = false;
+
// Process PID.
UPID pid;
};
@@ -564,7 +569,8 @@ long workers();
* Spawn a new process.
*
* @param process Process to be spawned.
- * @param manage Whether process should get garbage collected.
+ * @param manage Whether process should get deleted by the runtime
+ * after terminating.
*/
UPID spawn(ProcessBase* process, bool manage = false);
http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0bfc6d6..7aba8f0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -70,7 +70,6 @@
#include <process/executor.hpp>
#include <process/filter.hpp>
#include <process/future.hpp>
-#include <process/gc.hpp>
#include <process/help.hpp>
#include <process/id.hpp>
#include <process/io.hpp>
@@ -511,8 +510,8 @@ public:
~ProcessManager();
// Prevents any further processes from spawning and terminates all
- // running processes. The special `gc` process will be terminated
- // last. Then joins all processing threads and stops the event loop.
+ // running processes. Then joins all processing threads and stops
+ // the event loop.
//
// This is a prerequisite for finalizing the `SocketManager`.
void finalize();
@@ -663,9 +662,6 @@ static AuthorizationCallbacks* authorization_callbacks = nullptr;
// Global route that returns process information.
static Route* processes_route = nullptr;
-// Global garbage collector.
-GarbageCollector* gc = nullptr;
-
// Global help.
PID<Help> help;
@@ -1316,16 +1312,15 @@ bool initialize(
future_accept = __s__->accept()
.onAny(lambda::bind(&internal::on_accept, lambda::_1));
- // TODO(benh): Make sure creating the garbage collector, logging
- // process, and profiler always succeeds and use supervisors to make
- // sure that none terminate.
+ // TODO(benh): Make sure creating the logging process, and profiler
+ // always succeeds and use supervisors to make sure that none
+ // terminate.
// For the global processes below, the order of initialization matters.
// Some global processes are necessary for the function of certain methods:
//
// process | Underpins this method
// --------|---------------------------
- // gc | process::spawn(..., true)
// help | ProcessBase::route(...)
// metrics | process::metrics::add(...)
//
@@ -1334,22 +1329,17 @@ bool initialize(
// graph shows what processes depend on which other processes.
// Processes in the same vertical group can be safely started in any order.
//
- // gc
- // |--help
- // | |--metrics
- // | | |--system
- // | | |--job_object_manager (Windows only)
- // | | |--All other processes
- // | |
- // | |--logging
- // | |--profiler
- // | |--processesRoute
+ // help
+ // |--metrics
+ // | |--system
+ // | |--job_object_manager (Windows only)
+ // | |--All other processes
// |
- // |--authentication_manager
-
- // Create global garbage collector process.
- gc = new GarbageCollector();
- spawn(gc);
+ // |--logging
+ // |--profiler
+ // |--processesRoute
+ //
+ // authenticator_manager
// Create global help process.
help = spawn(new Help(delegate), true);
@@ -1796,7 +1786,10 @@ void SocketManager::finalize()
// We require all processes to be terminated prior to finalizing the
// `SocketManager`. This simplifies the finalization logic as we do not
// have to worry about sockets or links being created during cleanup.
- CHECK(gc == nullptr);
+ //
+ // TODO(benh): can't do the following anymore, need another way:
+ //
+ // CHECK(gc == nullptr);
int_fd socket = -1;
// Close each socket.
@@ -2653,14 +2646,14 @@ void SocketManager::exited(const Address& address)
void SocketManager::exited(ProcessBase* process)
{
- // An exited event is enough to cause the process to get deleted
- // (e.g., by the garbage collector), which means we can't
- // dereference process (or even use the address) after we enqueue at
- // least one exited event. Thus, we save the process pid.
+ // TODO(benh): an exited event might cause `process` to get deleted
+ // (e.g., by someone who did a `link()`), even though they should
+ // really be doing a `wait()`. To be on the safe side here we save
+ // `process->pid` and the current time of the process so we can
+ // avoid dereferencing `process` after we enqueue at least one
+ // exited event. Really we should only store a `UPID` in these data
+ // structures and not use `ProcessBase` at all!
const UPID pid = process->pid;
-
- // Likewise, we need to save the current time of the process so we
- // can update the clocks of linked processes as appropriate.
const Time time = Clock::now(process);
synchronized (mutex) {
@@ -2780,8 +2773,6 @@ ProcessManager::~ProcessManager() {}
void ProcessManager::finalize()
{
- CHECK(gc != nullptr);
-
// Prevent anymore processes from being spawned.
finalizing.store(true);
@@ -2789,10 +2780,6 @@ void ProcessManager::finalize()
// is erased from `processes` in ProcessManager::cleanup(). Don't hold
// the lock or process the whole map as terminating one process might
// trigger other terminations.
- //
- // We skip the GC process in this loop and instead terminate it last.
- // This ensures that the GC process is running whenever we terminate
- // any GC-managed process, which is necessary to prevent leaking.
while (true) {
// NOTE: We terminate by `UPID` rather than `ProcessBase` as the
// process may terminate between the synchronized section below
@@ -2802,21 +2789,12 @@ void ProcessManager::finalize()
UPID pid;
synchronized (processes_mutex) {
- ProcessBase* process = nullptr;
-
- foreachvalue (ProcessBase* candidate, processes) {
- if (candidate == gc) {
- continue;
- }
-
- process = candidate;
- pid = candidate->self();
+ if (processes.empty()) {
break;
}
- if (process == nullptr) {
- break;
- }
+ // Grab the `UPID` for the next process we'll terminate.
+ pid = processes.values().front()->self();
}
// Terminate this process but do not inject the message,
@@ -2825,15 +2803,6 @@ void ProcessManager::finalize()
process::wait(pid);
}
- // Terminate `gc`.
- process::terminate(gc, false);
- process::wait(gc);
-
- synchronized (processes_mutex) {
- delete gc;
- gc = nullptr;
- }
-
// Send signal to all processing threads to stop running.
joining_threads.store(true);
runq.decomission();
@@ -3205,19 +3174,9 @@ bool ProcessManager::deliver(
UPID ProcessManager::spawn(ProcessBase* process, bool manage)
{
- CHECK(process != nullptr);
-
- if (process->state.load() != ProcessBase::State::BOTTOM) {
- LOG(WARNING)
- << "Attempted to spawn a process (" << process->self()
- << ") that has already been initialized";
+ CHECK_NOTNULL(process);
- if (manage) {
- delete process;
- }
-
- return UPID();
- }
+ bool spawned = false;
// If the `ProcessManager` is cleaning itself up, no further processes
// may be spawned.
@@ -3225,44 +3184,43 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
LOG(WARNING)
<< "Attempted to spawn a process (" << process->self()
<< ") after finalizing libprocess!";
+ } else if (process->state.load() != ProcessBase::State::BOTTOM) {
+ LOG(WARNING)
+ << "Attempted to spawn a process (" << process->self()
+ << ") that has already been initialized";
+ } else {
+ synchronized (processes_mutex) {
+ if (processes.count(process->pid.id) > 0) {
+ LOG(WARNING)
+ << "Attempted to spawn already running process " << process->pid;
+ } else {
+ processes[process->pid.id] = process;
+
+ // NOTE: we set process reference on it's `UPID` _after_ we've
+ // spawned so that we make sure that we'll take the
+ // `ProcessManager::use()` code path in the event that we
+ // aren't able to spawn the process. This is important in
+ // circumstances where there are multiple processes with the
+ // same ID because the semantics that people have come to
+ // expect from libprocess is that a `UPID` should "resolve" to
+ // the already spawned process rather than a process that has
+ // the same name but hasn't yet been spawned.
+ process->pid.reference = process->reference;
+
+ spawned = true;
+ }
+ }
+ }
+ if (!spawned) {
if (manage) {
delete process;
}
-
return UPID();
}
- synchronized (processes_mutex) {
- if (processes.count(process->pid.id) > 0) {
- LOG(WARNING)
- << "Attempted to spawn already running process " << process->pid;
-
- // TODO(benh): we should be deleting `process` here, this is a
- // long standing bug. This isn't straightforward because we
- // can't delete it within the `synchronized (processes_mutex)`
- // block.
-
- return UPID();
- } else {
- processes[process->pid.id] = process;
-
- // NOTE: we set process reference on it's `UPID` _after_ we've
- // spawned so that we make sure that we'll take the
- // `ProcessManager::use()` code path in the event that we aren't
- // able to spawn the process. This is important in circumstances
- // where there are multiple processes with the same ID because
- // the semantics that people have come to expect from libprocess
- // is that a `UPID` should "resolve" to the already spawned
- // process rather than a process that has the same name but
- // hasn't yet been spawned.
- process->pid.reference = process->reference;
- }
- }
-
- // Use the garbage collector if requested.
if (manage) {
- dispatch(gc->self(), &GarbageCollector::manage<ProcessBase>, process);
+ process->manage = true;
}
// We save the PID before enqueueing the process to avoid the race
@@ -3297,6 +3255,9 @@ void ProcessManager::resume(ProcessBase* process)
if (state == ProcessBase::State::BOTTOM) {
try { process->initialize(); }
catch (...) { terminate = true; }
+
+ state = ProcessBase::State::READY;
+ process->state.store(state);
}
while (!terminate && !blocked) {
@@ -3393,18 +3354,27 @@ void ProcessManager::resume(ProcessBase* process)
}
delete event;
-
- if (terminate) {
- cleanup(process);
- }
}
}
- // TODO(benh): If `terminate` was set to true when we initialized
- // then we'll never actually cleanup! This bug has been here a long
- // time!!!
+ // Must read and store if we are managing `process` because in the
+ // event we are not managing `process` it might get deallocated
+ // after we open the gate in `ProcessManager::cleanup()` and thus
+ // can't be dereferenced.
+ bool manage = process->manage;
+
+ if (terminate) {
+ cleanup(process);
+ }
__process__ = nullptr;
+
+ // Need to delete the process _after_ we've set `__process__` back
+ // to `nullptr` otherwise during destruction we might execute code
+ // that uses/dereferences `__process__` erroneously.
+ if (terminate && manage) {
+ delete process;
+ }
}
@@ -3412,6 +3382,10 @@ void ProcessManager::cleanup(ProcessBase* process)
{
VLOG(2) << "Cleaning up " << process->pid;
+ // Invariant today is that all processes must be initialized and
+ // have their state transition to READY before being terminated.
+ CHECK(process->state.load() == ProcessBase::State::READY);
+
// First, set the terminating state so no more events will get
// enqueued and then decomission the event queue which will also
// delete all the pending events. We want to delete the events
@@ -3456,36 +3430,37 @@ void ProcessManager::cleanup(ProcessBase* process)
// Note that we don't remove the process from the clock during
// cleanup, but rather the clock is reset for a process when it is
// created (see ProcessBase::ProcessBase). We do this so that
- // SocketManager::exited can access the current time of the
+ // `SocketManager::exited()` can access the current time of the
// process to "order" exited events. TODO(benh): It might make
// sense to consider storing the time of the process as a field of
- // the class instead.
+ // the class instead. It probably also makes sense to pass the
+ // time to `SocketManager::exited()` rather than expect it to call
+ // into the clock.
// Now we tell the socket manager about this process exiting so
// that it can create exited events for linked processes. We
// _must_ do this while synchronized on processes because
// otherwise another process could attempt to link this process
- // and SocketManager::link would see that the processes doesn't
- // exist when it attempts to get a ProcessReference (since we
- // removed the process above) thus causing an exited event, which
- // could cause the process to get deleted (e.g., the garbage
- // collector might link _after_ the process has already been
- // removed from processes thus getting an exited event but we
- // don't want that exited event to fire and actually delete the
- // process until after we have used the process in
- // SocketManager::exited).
+ // and `SocketManager::link()` would see that the processes
+ // doesn't exist when it attempts to get a `ProcessReference`
+ // (since we removed the process above) thus causing an exited
+ // event, which could cause the process to get deleted if someone
+ // is not properly doing a `wait()` but just waiting for exited
+ // events.
socket_manager->exited(process);
// ***************************************************************
- // At this point we can no longer dereference the process since it
- // might already be deallocated (e.g., by the garbage collector).
+ // At this point we should avoid dereferencing `process` since it
+ // might already be deallocated if some code is treating exited
+ // events (which were just sent above) as an indication that
+ // `process` has terminated _instead_ of calling `wait()` which
+ // will only return _after_ we open the gate below.
// ***************************************************************
- // Note that we need to open the gate while synchronized on
- // processes because otherwise we might _open_ the gate before
- // another thread _approaches_ the gate causing that thread to
- // wait on _arrival_ to the gate forever (see
- // ProcessManager::wait).
+ // Note that we need to open the gate within `synchronized
+ // (processes_mutex)` so that there is a happens-before
+ // relationship with respect to a process terminating and another
+ // process starting with the same `UPID`.
CHECK(gate);
gate->open();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 8d36600..82efb2f 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -37,7 +37,6 @@
#include <process/executor.hpp>
#include <process/filter.hpp>
#include <process/future.hpp>
-#include <process/gc.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/network.hpp>