You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/09/07 03:09:10 UTC

mesos git commit: Removed garbage collector.

Repository: mesos
Updated Branches:
  refs/heads/master f07d3a7f5 -> e04d3cf81


Removed garbage collector.

The garbage collector had at least two bugs:

(1) If someone dispatched `manage()` twice in a row the process we're
    waiting for will get overwritten which can wreak havoc depending
    on when the calls to `link()` happen.

(2) The garbage collector was deleting after an exited event rather
    than actually doing a `wait()`.

The simpler implementation that this patch introduces is to just
delete the process after doing `ProcessManager::cleanup()`.

Review: https://reviews.apache.org/r/62053


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e04d3cf8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e04d3cf8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e04d3cf8

Branch: refs/heads/master
Commit: e04d3cf81f542d4d808b0f7eb3247bd34fbc976e
Parents: f07d3a7
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Sep 2 08:00:12 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 6 20:08:55 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/Makefile.am         |   1 -
 3rdparty/libprocess/include/process/gc.hpp      |  55 -----
 3rdparty/libprocess/include/process/process.hpp |   8 +-
 3rdparty/libprocess/src/process.cpp             | 229 +++++++++----------
 3rdparty/libprocess/src/tests/process_tests.cpp |   1 -
 5 files changed, 109 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index c5dc0bb..94c7a72 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -29,7 +29,6 @@ nobase_include_HEADERS =		\
   process/filter.hpp			\
   process/firewall.hpp			\
   process/future.hpp			\
-  process/gc.hpp			\
   process/gmock.hpp			\
   process/grpc.hpp			\
   process/gtest.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/process/gc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gc.hpp b/3rdparty/libprocess/include/process/gc.hpp
deleted file mode 100644
index 603bb8b..0000000
--- a/3rdparty/libprocess/include/process/gc.hpp
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __PROCESS_GC_HPP__
-#define __PROCESS_GC_HPP__
-
-#include <map>
-
-#include <process/process.hpp>
-
-
-namespace process {
-
-class GarbageCollector : public Process<GarbageCollector>
-{
-public:
-  GarbageCollector() : ProcessBase("__gc__") {}
-  virtual ~GarbageCollector() {}
-
-  template <typename T>
-  void manage(const T* t)
-  {
-    const ProcessBase* process = t;
-    if (process != nullptr) {
-      processes[process->self()] = process;
-      link(process->self());
-    }
-  }
-
-protected:
-  virtual void exited(const UPID& pid)
-  {
-    if (processes.count(pid) > 0) {
-      const ProcessBase* process = processes[pid];
-      processes.erase(pid);
-      delete process;
-    }
-  }
-
-private:
-  std::map<UPID, const ProcessBase*> processes;
-};
-
-} // namespace process {
-
-#endif // __PROCESS_GC_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 8cca782..dc3375c 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -475,6 +475,11 @@ private:
 
   std::shared_ptr<Gate> gate;
 
+  // Whether or not the runtime should delete this process after it
+  // has terminated. Note that failure to spawn the process will leave
+  // the process unmanaged and thus it may leak!
+  bool manage = false;
+
   // Process PID.
   UPID pid;
 };
@@ -564,7 +569,8 @@ long workers();
  * Spawn a new process.
  *
  * @param process Process to be spawned.
- * @param manage Whether process should get garbage collected.
+ * @param manage Whether process should get deleted by the runtime
+ *     after terminating.
  */
 UPID spawn(ProcessBase* process, bool manage = false);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0bfc6d6..7aba8f0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -70,7 +70,6 @@
 #include <process/executor.hpp>
 #include <process/filter.hpp>
 #include <process/future.hpp>
-#include <process/gc.hpp>
 #include <process/help.hpp>
 #include <process/id.hpp>
 #include <process/io.hpp>
@@ -511,8 +510,8 @@ public:
   ~ProcessManager();
 
   // Prevents any further processes from spawning and terminates all
-  // running processes. The special `gc` process will be terminated
-  // last. Then joins all processing threads and stops the event loop.
+  // running processes. Then joins all processing threads and stops
+  // the event loop.
   //
   // This is a prerequisite for finalizing the `SocketManager`.
   void finalize();
@@ -663,9 +662,6 @@ static AuthorizationCallbacks* authorization_callbacks = nullptr;
 // Global route that returns process information.
 static Route* processes_route = nullptr;
 
-// Global garbage collector.
-GarbageCollector* gc = nullptr;
-
 // Global help.
 PID<Help> help;
 
@@ -1316,16 +1312,15 @@ bool initialize(
   future_accept = __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
 
-  // TODO(benh): Make sure creating the garbage collector, logging
-  // process, and profiler always succeeds and use supervisors to make
-  // sure that none terminate.
+  // TODO(benh): Make sure creating the logging process, and profiler
+  // always succeeds and use supervisors to make sure that none
+  // terminate.
 
   // For the global processes below, the order of initialization matters.
   // Some global processes are necessary for the function of certain methods:
   //
   //   process | Underpins this method
   //   --------|---------------------------
-  //   gc      | process::spawn(..., true)
   //   help    | ProcessBase::route(...)
   //   metrics | process::metrics::add(...)
   //
@@ -1334,22 +1329,17 @@ bool initialize(
   // graph shows what processes depend on which other processes.
   // Processes in the same vertical group can be safely started in any order.
   //
-  //   gc
-  //   |--help
-  //   |  |--metrics
-  //   |  |  |--system
-  //   |  |  |--job_object_manager (Windows only)
-  //   |  |  |--All other processes
-  //   |  |
-  //   |  |--logging
-  //   |  |--profiler
-  //   |  |--processesRoute
+  //   help
+  //   |--metrics
+  //   |  |--system
+  //   |  |--job_object_manager (Windows only)
+  //   |  |--All other processes
   //   |
-  //   |--authentication_manager
-
-  // Create global garbage collector process.
-  gc = new GarbageCollector();
-  spawn(gc);
+  //   |--logging
+  //   |--profiler
+  //   |--processesRoute
+  //
+  //   authenticator_manager
 
   // Create global help process.
   help = spawn(new Help(delegate), true);
@@ -1796,7 +1786,10 @@ void SocketManager::finalize()
   // We require all processes to be terminated prior to finalizing the
   // `SocketManager`. This simplifies the finalization logic as we do not
   // have to worry about sockets or links being created during cleanup.
-  CHECK(gc == nullptr);
+  //
+  // TODO(benh): can't do the following anymore, need another way:
+  //
+  // CHECK(gc == nullptr);
 
   int_fd socket = -1;
   // Close each socket.
@@ -2653,14 +2646,14 @@ void SocketManager::exited(const Address& address)
 
 void SocketManager::exited(ProcessBase* process)
 {
-  // An exited event is enough to cause the process to get deleted
-  // (e.g., by the garbage collector), which means we can't
-  // dereference process (or even use the address) after we enqueue at
-  // least one exited event. Thus, we save the process pid.
+  // TODO(benh): an exited event might cause `process` to get deleted
+  // (e.g., by someone who did a `link()`), even though they should
+  // really be doing a `wait()`. To be on the safe side here we save
+  // `process->pid` and the current time of the process so we can
+  // avoid dereferencing `process` after we enqueue at least one
+  // exited event. Really we should only store a `UPID` in these data
+  // structures and not use `ProcessBase` at all!
   const UPID pid = process->pid;
-
-  // Likewise, we need to save the current time of the process so we
-  // can update the clocks of linked processes as appropriate.
   const Time time = Clock::now(process);
 
   synchronized (mutex) {
@@ -2780,8 +2773,6 @@ ProcessManager::~ProcessManager() {}
 
 void ProcessManager::finalize()
 {
-  CHECK(gc != nullptr);
-
   // Prevent anymore processes from being spawned.
   finalizing.store(true);
 
@@ -2789,10 +2780,6 @@ void ProcessManager::finalize()
   // is erased from `processes` in ProcessManager::cleanup(). Don't hold
   // the lock or process the whole map as terminating one process might
   // trigger other terminations.
-  //
-  // We skip the GC process in this loop and instead terminate it last.
-  // This ensures that the GC process is running whenever we terminate
-  // any GC-managed process, which is necessary to prevent leaking.
   while (true) {
     // NOTE: We terminate by `UPID` rather than `ProcessBase` as the
     // process may terminate between the synchronized section below
@@ -2802,21 +2789,12 @@ void ProcessManager::finalize()
     UPID pid;
 
     synchronized (processes_mutex) {
-      ProcessBase* process = nullptr;
-
-      foreachvalue (ProcessBase* candidate, processes) {
-        if (candidate == gc) {
-          continue;
-        }
-
-        process = candidate;
-        pid = candidate->self();
+      if (processes.empty()) {
         break;
       }
 
-      if (process == nullptr) {
-        break;
-      }
+      // Grab the `UPID` for the next process we'll terminate.
+      pid = processes.values().front()->self();
     }
 
     // Terminate this process but do not inject the message,
@@ -2825,15 +2803,6 @@ void ProcessManager::finalize()
     process::wait(pid);
   }
 
-  // Terminate `gc`.
-  process::terminate(gc, false);
-  process::wait(gc);
-
-  synchronized (processes_mutex) {
-    delete gc;
-    gc = nullptr;
-  }
-
   // Send signal to all processing threads to stop running.
   joining_threads.store(true);
   runq.decomission();
@@ -3205,19 +3174,9 @@ bool ProcessManager::deliver(
 
 UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 {
-  CHECK(process != nullptr);
-
-  if (process->state.load() != ProcessBase::State::BOTTOM) {
-    LOG(WARNING)
-      << "Attempted to spawn a process (" << process->self()
-      << ") that has already been initialized";
+  CHECK_NOTNULL(process);
 
-    if (manage) {
-      delete process;
-    }
-
-    return UPID();
-  }
+  bool spawned = false;
 
   // If the `ProcessManager` is cleaning itself up, no further processes
   // may be spawned.
@@ -3225,44 +3184,43 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
     LOG(WARNING)
       << "Attempted to spawn a process (" << process->self()
       << ") after finalizing libprocess!";
+  } else if (process->state.load() != ProcessBase::State::BOTTOM) {
+    LOG(WARNING)
+      << "Attempted to spawn a process (" << process->self()
+      << ") that has already been initialized";
+  } else {
+    synchronized (processes_mutex) {
+      if (processes.count(process->pid.id) > 0) {
+        LOG(WARNING)
+          << "Attempted to spawn already running process " << process->pid;
+      } else {
+        processes[process->pid.id] = process;
+
+        // NOTE: we set process reference on it's `UPID` _after_ we've
+        // spawned so that we make sure that we'll take the
+        // `ProcessManager::use()` code path in the event that we
+        // aren't able to spawn the process. This is important in
+        // circumstances where there are multiple processes with the
+        // same ID because the semantics that people have come to
+        // expect from libprocess is that a `UPID` should "resolve" to
+        // the already spawned process rather than a process that has
+        // the same name but hasn't yet been spawned.
+        process->pid.reference = process->reference;
+
+        spawned = true;
+      }
+    }
+  }
 
+  if (!spawned) {
     if (manage) {
       delete process;
     }
-
     return UPID();
   }
 
-  synchronized (processes_mutex) {
-    if (processes.count(process->pid.id) > 0) {
-      LOG(WARNING)
-        << "Attempted to spawn already running process " << process->pid;
-
-      // TODO(benh): we should be deleting `process` here, this is a
-      // long standing bug. This isn't straightforward because we
-      // can't delete it within the `synchronized (processes_mutex)`
-      // block.
-
-      return UPID();
-    } else {
-      processes[process->pid.id] = process;
-
-      // NOTE: we set process reference on it's `UPID` _after_ we've
-      // spawned so that we make sure that we'll take the
-      // `ProcessManager::use()` code path in the event that we aren't
-      // able to spawn the process. This is important in circumstances
-      // where there are multiple processes with the same ID because
-      // the semantics that people have come to expect from libprocess
-      // is that a `UPID` should "resolve" to the already spawned
-      // process rather than a process that has the same name but
-      // hasn't yet been spawned.
-      process->pid.reference = process->reference;
-    }
-  }
-
-  // Use the garbage collector if requested.
   if (manage) {
-    dispatch(gc->self(), &GarbageCollector::manage<ProcessBase>, process);
+    process->manage = true;
   }
 
   // We save the PID before enqueueing the process to avoid the race
@@ -3297,6 +3255,9 @@ void ProcessManager::resume(ProcessBase* process)
   if (state == ProcessBase::State::BOTTOM) {
     try { process->initialize(); }
     catch (...) { terminate = true; }
+
+    state = ProcessBase::State::READY;
+    process->state.store(state);
   }
 
   while (!terminate && !blocked) {
@@ -3393,18 +3354,27 @@ void ProcessManager::resume(ProcessBase* process)
       }
 
       delete event;
-
-      if (terminate) {
-        cleanup(process);
-      }
     }
   }
 
-  // TODO(benh): If `terminate` was set to true when we initialized
-  // then we'll never actually cleanup! This bug has been here a long
-  // time!!!
+  // Must read and store if we are managing `process` because in the
+  // event we are not managing `process` it might get deallocated
+  // after we open the gate in `ProcessManager::cleanup()` and thus
+  // can't be dereferenced.
+  bool manage = process->manage;
+
+  if (terminate) {
+    cleanup(process);
+  }
 
   __process__ = nullptr;
+
+  // Need to delete the process _after_ we've set `__process__` back
+  // to `nullptr` otherwise during destruction we might execute code
+  // that uses/dereferences `__process__` erroneously.
+  if (terminate && manage) {
+    delete process;
+  }
 }
 
 
@@ -3412,6 +3382,10 @@ void ProcessManager::cleanup(ProcessBase* process)
 {
   VLOG(2) << "Cleaning up " << process->pid;
 
+  // Invariant today is that all processes must be initialized and
+  // have their state transition to READY before being terminated.
+  CHECK(process->state.load() == ProcessBase::State::READY);
+
   // First, set the terminating state so no more events will get
   // enqueued and then decomission the event queue which will also
   // delete all the pending events. We want to delete the events
@@ -3456,36 +3430,37 @@ void ProcessManager::cleanup(ProcessBase* process)
     // Note that we don't remove the process from the clock during
     // cleanup, but rather the clock is reset for a process when it is
     // created (see ProcessBase::ProcessBase). We do this so that
-    // SocketManager::exited can access the current time of the
+    // `SocketManager::exited()` can access the current time of the
     // process to "order" exited events. TODO(benh): It might make
     // sense to consider storing the time of the process as a field of
-    // the class instead.
+    // the class instead. It probably also makes sense to pass the
+    // time to `SocketManager::exited()` rather than expect it to call
+    // into the clock.
 
     // Now we tell the socket manager about this process exiting so
     // that it can create exited events for linked processes. We
     // _must_ do this while synchronized on processes because
     // otherwise another process could attempt to link this process
-    // and SocketManager::link would see that the processes doesn't
-    // exist when it attempts to get a ProcessReference (since we
-    // removed the process above) thus causing an exited event, which
-    // could cause the process to get deleted (e.g., the garbage
-    // collector might link _after_ the process has already been
-    // removed from processes thus getting an exited event but we
-    // don't want that exited event to fire and actually delete the
-    // process until after we have used the process in
-    // SocketManager::exited).
+    // and `SocketManager::link()` would see that the processes
+    // doesn't exist when it attempts to get a `ProcessReference`
+    // (since we removed the process above) thus causing an exited
+    // event, which could cause the process to get deleted if someone
+    // is not properly doing a `wait()` but just waiting for exited
+    // events.
     socket_manager->exited(process);
 
     // ***************************************************************
-    // At this point we can no longer dereference the process since it
-    // might already be deallocated (e.g., by the garbage collector).
+    // At this point we should avoid dereferencing `process` since it
+    // might already be deallocated if some code is treating exited
+    // events (which were just sent above) as an indication that
+    // `process` has terminated _instead_ of calling `wait()` which
+    // will only return _after_ we open the gate below.
     // ***************************************************************
 
-    // Note that we need to open the gate while synchronized on
-    // processes because otherwise we might _open_ the gate before
-    // another thread _approaches_ the gate causing that thread to
-    // wait on _arrival_ to the gate forever (see
-    // ProcessManager::wait).
+    // Note that we need to open the gate within `synchronized
+    // (processes_mutex)` so that there is a happens-before
+    // relationship with respect to a process terminating and another
+    // process starting with the same `UPID`.
     CHECK(gate);
     gate->open();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e04d3cf8/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 8d36600..82efb2f 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -37,7 +37,6 @@
 #include <process/executor.hpp>
 #include <process/filter.hpp>
 #include <process/future.hpp>
-#include <process/gc.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
 #include <process/network.hpp>