You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/08/01 21:03:06 UTC

[08/16] mesos git commit: Factored out the event queue and made `state` be atomic.

Factored out the event queue and made `state` be atomic.

This patch also introduces a function to construct a JSON
representation of a ProcessBase which is used to implement the
/__processes__ endpoint. We've then changed the implementation of that
endpoint to send messages to each process in order to have it inspect
it's event queue independently. In the future we may want to introduce
a high-priority queue to process events like these.

Review: https://reviews.apache.org/r/61055


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4330c086
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4330c086
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4330c086

Branch: refs/heads/master
Commit: 4330c08610833c7f02356872a85b65e631b94248
Parents: b28548f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jul 20 23:36:45 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Aug 1 14:01:51 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 3rdparty/libprocess/include/process/event.hpp   |  58 ++++
 3rdparty/libprocess/include/process/process.hpp |  36 +--
 3rdparty/libprocess/src/CMakeLists.txt          |   1 +
 3rdparty/libprocess/src/event_queue.hpp         | 120 ++++++++
 3rdparty/libprocess/src/process.cpp             | 302 ++++++++++---------
 6 files changed, 356 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 378a434..7939ff7 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -164,6 +164,7 @@ libprocess_la_SOURCES =		\
   src/decoder.hpp		\
   src/encoder.hpp		\
   src/event_loop.hpp		\
+  src/event_queue.hpp		\
   src/firewall.cpp		\
   src/gate.hpp			\
   src/help.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index a0ec053..6ae4207 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -21,6 +21,7 @@
 #include <process/socket.hpp>
 
 #include <stout/abort.hpp>
+#include <stout/json.hpp>
 #include <stout/lambda.hpp>
 
 namespace process {
@@ -81,6 +82,9 @@ struct Event
     }
     return *result;
   }
+
+  // JSON representation for an Event.
+  operator JSON::Object() const;
 };
 
 
@@ -209,6 +213,60 @@ private:
   TerminateEvent& operator=(const TerminateEvent&);
 };
 
+
+inline Event::operator JSON::Object() const
+{
+  JSON::Object object;
+
+  struct Visitor : EventVisitor
+  {
+    explicit Visitor(JSON::Object* _object) : object(_object) {}
+
+    virtual void visit(const MessageEvent& event)
+    {
+      object->values["type"] = "MESSAGE";
+
+      const Message& message = event.message;
+
+      object->values["name"] = message.name;
+      object->values["from"] = stringify(message.from);
+      object->values["to"] = stringify(message.to);
+      object->values["body"] = message.body;
+    }
+
+    virtual void visit(const HttpEvent& event)
+    {
+      object->values["type"] = "HTTP";
+
+      const http::Request& request = *event.request;
+
+      object->values["method"] = request.method;
+      object->values["url"] = stringify(request.url);
+    }
+
+    virtual void visit(const DispatchEvent& event)
+    {
+      object->values["type"] = "DISPATCH";
+    }
+
+    virtual void visit(const ExitedEvent& event)
+    {
+      object->values["type"] = "EXITED";
+    }
+
+    virtual void visit(const TerminateEvent& event)
+    {
+      object->values["type"] = "TERMINATE";
+    }
+
+    JSON::Object* object;
+  } visitor(&object);
+
+  visit(&visitor);
+
+  return object;
+}
+
 } // namespace process {
 
 #endif // __PROCESS_EVENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 063a01e..724e177 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -41,6 +41,7 @@
 namespace process {
 
 // Forward declaration.
+class EventQueue;
 class Gate;
 class Logging;
 class Sequence;
@@ -379,20 +380,12 @@ protected:
   }
 
   /**
-   * Returns the number of events of the given type currently on the event
-   * queue.
+   * Returns the number of events of the given type currently on the
+   * event queue. MUST be invoked from within the process itself in
+   * order to safely examine events.
    */
   template <typename T>
-  size_t eventCount()
-  {
-    size_t count = 0U;
-
-    synchronized (mutex) {
-      count = std::count_if(events.begin(), events.end(), isEventType<T>);
-    }
-
-    return count;
-  }
+  size_t eventCount();
 
 private:
   friend class SocketManager;
@@ -405,19 +398,15 @@ private:
   // Transitioning from BLOCKED to READY also requires enqueueing the
   // process in the run queue otherwise the events will never be
   // processed!
-  enum
+  enum class State
   {
     BOTTOM, // Uninitialized but events may be enqueued.
     BLOCKED, // Initialized, no events enqueued.
     READY, // Initialized, events enqueued.
     TERMINATING // Initialized, no more events will be enqueued.
-  } state;
+  };
 
-  template <typename T>
-  static bool isEventType(const Event* event)
-  {
-    return event->is<T>();
-  }
+  std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
 
   // Mutex protecting internals.
   // TODO(benh): Consider replacing with a spinlock, on multi-core systems.
@@ -476,11 +465,16 @@ private:
       const std::string& name,
       const Owned<http::Request>& request);
 
+  // JSON representation of process. MUST be invoked from within the
+  // process itself in order to safely examine events.
+  operator JSON::Object();
+
   // Static assets(s) to provide.
   std::map<std::string, Asset> assets;
 
-  // Queue of received events, requires lock()ed access!
-  std::deque<Event*> events;
+  // Queue of received events. We employ the PIMPL idiom here and use
+  // a pointer so we can hide the implementation of `EventQueue`.
+  std::unique_ptr<EventQueue> events;
 
   // Active references.
   std::atomic_long refs;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/CMakeLists.txt b/3rdparty/libprocess/src/CMakeLists.txt
index f97291b..8d2150e 100644
--- a/3rdparty/libprocess/src/CMakeLists.txt
+++ b/3rdparty/libprocess/src/CMakeLists.txt
@@ -39,6 +39,7 @@ set(PROCESS_SRC
   decoder.hpp
   encoder.hpp
   event_loop.hpp
+  event_queue.hpp
   firewall.cpp
   gate.hpp
   help.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/src/event_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
new file mode 100644
index 0000000..37d2359
--- /dev/null
+++ b/3rdparty/libprocess/src/event_queue.hpp
@@ -0,0 +1,120 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_EVENT_QUEUE_HPP__
+#define __PROCESS_EVENT_QUEUE_HPP__
+
+#include <deque>
+#include <mutex>
+#include <string>
+
+#include <process/event.hpp>
+#include <process/http.hpp>
+
+#include <stout/json.hpp>
+#include <stout/stringify.hpp>
+#include <stout/synchronized.hpp>
+
+namespace process {
+
+// TODO(benh): Document that the contract/semantics that an event
+// queue is designed to be _multiple_ producers and _single_ consumer
+// and that currently that requirement is not enforced or even checked
+// at runtime.
+
+class EventQueue
+{
+public:
+  void enqueue(Event* event)
+  {
+    bool enqueued = false;
+    synchronized (mutex) {
+      if (!decomissioned) {
+        events.push_back(event);
+        enqueued = true;
+      }
+    }
+
+    if (!enqueued) {
+      delete event;
+    }
+  }
+
+  Event* dequeue()
+  {
+    synchronized (mutex) {
+      if (events.size() > 0) {
+        Event* event = events.front();
+        events.pop_front();
+        return event;
+      }
+    }
+    // TODO(benh): The current semantics are that a call to dequeue
+    // will always be coupled by a call to empty and only get called
+    // if we're not empty which implies we should never return nullptr
+    // here, should we check this?
+    return nullptr;
+  }
+
+  bool empty()
+  {
+    synchronized (mutex) {
+      return events.size() == 0;
+    }
+  }
+
+  void decomission()
+  {
+    synchronized (mutex) {
+      decomissioned = true;
+      while (!events.empty()) {
+        Event* event = events.front();
+        events.pop_front();
+        delete event;
+      }
+    }
+  }
+
+  template <typename T>
+  size_t count()
+  {
+    synchronized (mutex) {
+      return std::count_if(
+          events.begin(),
+          events.end(),
+          [](const Event* event) {
+            return event->is<T>();
+          });
+    }
+  }
+
+  operator JSON::Array()
+  {
+    JSON::Array array;
+    synchronized (mutex) {
+      foreach (Event* event, events) {
+        JSON::Object object = *event;
+        array.values.push_back(object);
+      }
+    }
+    return array;
+  }
+
+private:
+  std::mutex mutex;
+  std::deque<Event*> events;
+  bool decomissioned = false;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_EVENT_QUEUE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/4330c086/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 5665171..d65941d 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -63,6 +63,7 @@
 #include <process/address.hpp>
 #include <process/check.hpp>
 #include <process/clock.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
@@ -113,6 +114,7 @@
 #include "decoder.hpp"
 #include "encoder.hpp"
 #include "event_loop.hpp"
+#include "event_queue.hpp"
 #include "gate.hpp"
 #include "process_reference.hpp"
 #include "run_queue.hpp"
@@ -3180,10 +3182,12 @@ void ProcessManager::resume(ProcessBase* process)
   bool terminate = false;
   bool blocked = false;
 
-  CHECK(process->state == ProcessBase::BOTTOM ||
-        process->state == ProcessBase::READY);
+  ProcessBase::State state = process->state.load();
 
-  if (process->state == ProcessBase::BOTTOM) {
+  CHECK(state == ProcessBase::State::BOTTOM ||
+        state == ProcessBase::State::READY);
+
+  if (state == ProcessBase::State::BOTTOM) {
     try { process->initialize(); }
     catch (...) { terminate = true; }
   }
@@ -3191,18 +3195,37 @@ void ProcessManager::resume(ProcessBase* process)
   while (!terminate && !blocked) {
     Event* event = nullptr;
 
-    synchronized (process->mutex) {
-      if (process->events.size() > 0) {
-        event = process->events.front();
-        process->events.pop_front();
-      } else {
-        process->state = ProcessBase::BLOCKED;
-        blocked = true;
+    if (!process->events->empty()) {
+      event = process->events->dequeue();
+    } else {
+      state = ProcessBase::State::BLOCKED;
+      process->state.store(state);
+      blocked = true;
+
+      // Now check that we didn't miss any events that got added
+      // before we set ourselves to BLOCKED since we won't have been
+      // added to the run queue in those circumstances so we need to
+      // serve those events!
+      if (!process->events->empty()) {
+        // Make sure the state is in READY! Either we need to
+        // explicitly do this because `ProcessBase::enqueue` saw us as
+        // READY (or BOTTOM) and didn't change the state or we're
+        // racing with `ProcessBase::enqueue` because they saw us at
+        // BLOCKED and are trying to change the state. If they change
+        // the state then they'll also enqueue this process, which
+        // means we need to bail because another thread might resume
+        // (and the reason we'll bail is because `blocked` is true)!
+        if (process->state.compare_exchange_strong(
+                state,
+                ProcessBase::State::READY)) {
+          blocked = false;
+          continue;
+        }
       }
     }
 
     if (!blocked) {
-      CHECK(event != nullptr);
+      CHECK_NOTNULL(event);
 
       // Determine if we should filter this event.
       //
@@ -3253,6 +3276,10 @@ void ProcessManager::resume(ProcessBase* process)
     }
   }
 
+  // TODO(benh): If `terminate` was set to true when we initialized
+  // then we'll never actually cleanup! This bug has been here a long
+  // time!!!
+
   __process__ = nullptr;
 }
 
@@ -3262,29 +3289,20 @@ void ProcessManager::cleanup(ProcessBase* process)
   VLOG(2) << "Cleaning up " << process->pid;
 
   // First, set the terminating state so no more events will get
-  // enqueued and delete all the pending events. We want to delete the
-  // events before we hold the processes lock because deleting an
-  // event could cause code outside libprocess to get executed which
-  // might cause a deadlock with the processes lock. Likewise,
-  // deleting the events now rather than later has the nice property
-  // of making sure that any events that might have gotten enqueued on
-  // the process we are cleaning up will get dropped (since it's
-  // terminating) and eliminates the potential of enqueueing them on
-  // another process that gets spawned with the same PID.
-  deque<Event*> events;
-
-  synchronized (process->mutex) {
-    process->state = ProcessBase::TERMINATING;
-    events = process->events;
-    process->events.clear();
-  }
-
-  // Delete pending events.
-  while (!events.empty()) {
-    Event* event = events.front();
-    events.pop_front();
-    delete event;
-  }
+  // enqueued and then decomission the event queue which will also
+  // delete all the pending events. We want to delete the events
+  // before we hold `processes_mutex` because deleting an event could
+  // cause code outside libprocess to get executed which might cause a
+  // deadlock with `processes_mutex`. Also, deleting the events now
+  // rather than later has the nice property of making sure that any
+  // _new_ events that might have gotten enqueued _BACK_ onto this
+  // process due to the deleting of the pending events will get
+  // dropped since this process is now TERMINATING, which eliminates
+  // the potential of these new events from getting enqueued onto a
+  // _new_ process that gets spawned with the same PID.
+  process->state.store(ProcessBase::State::TERMINATING);
+
+  process->events->decomission();
 
   // Remove help strings for all installed routes for this process.
   dispatch(help, &Help::remove, process->pid.id);
@@ -3302,7 +3320,7 @@ void ProcessManager::cleanup(ProcessBase* process)
     }
 
     synchronized (process->mutex) {
-      CHECK(process->events.empty());
+      CHECK(process->events->empty());
 
       processes.erase(process->pid.id);
 
@@ -3397,16 +3415,16 @@ bool ProcessManager::wait(const UPID& pid)
 
   ProcessBase* process = nullptr; // Set to non-null if we donate thread.
 
-  // Try and approach the gate if necessary.
-  synchronized (processes_mutex) {
-    if (processes.count(pid.id) > 0) {
-      process = processes[pid.id];
+  if (ProcessReference reference = use(pid)) {
+    // Save the process assuming we can donate to it.
+    process = reference;
 
-      gate = process->gate;
+    gate = process->gate;
 
-      // Check if it is runnable in order to donate this thread.
-      if (process->state == ProcessBase::BOTTOM ||
-          process->state == ProcessBase::READY) {
+    // Check if it is runnable in order to donate this thread.
+    switch (process->state.load()) {
+      case ProcessBase::State::BOTTOM:
+      case ProcessBase::State::READY:
         // Assume that we'll be able to successfully extract the
         // process from the run queue and optimistically increment
         // `running` so that `Clock::settle` properly waits. In the
@@ -3424,10 +3442,11 @@ bool ProcessManager::wait(const UPID& pid)
           running.fetch_sub(1);
           process = nullptr;
         }
-      } else {
-        // Process is not runnable, so no need to donate ...
+        break;
+      case ProcessBase::State::BLOCKED:
+      case ProcessBase::State::TERMINATING:
         process = nullptr;
-      }
+        break;
     }
   }
 
@@ -3612,96 +3631,36 @@ void ProcessManager::settle()
 
 Future<Response> ProcessManager::__processes__(const Request&)
 {
-  JSON::Array array;
-
   synchronized (processes_mutex) {
-    foreachvalue (ProcessBase* process, process_manager->processes) {
-      JSON::Object object;
-      object.values["id"] = process->pid.id;
-
-      JSON::Array events;
-
-      struct JSONVisitor : EventVisitor
-      {
-        explicit JSONVisitor(JSON::Array* _events) : events(_events) {}
-
-        virtual void visit(const MessageEvent& event)
-        {
-          JSON::Object object;
-          object.values["type"] = "MESSAGE";
-
-          const Message& message = event.message;
-
-          object.values["name"] = message.name;
-          object.values["from"] = string(message.from);
-          object.values["to"] = string(message.to);
-          object.values["body"] = message.body;
-
-          events->values.push_back(object);
-        }
-
-        virtual void visit(const HttpEvent& event)
-        {
-          JSON::Object object;
-          object.values["type"] = "HTTP";
-
-          const Request& request = *event.request;
-
-          object.values["method"] = request.method;
-          object.values["url"] = stringify(request.url);
-
-          events->values.push_back(object);
-        }
-
-        virtual void visit(const DispatchEvent& event)
-        {
-          JSON::Object object;
-          object.values["type"] = "DISPATCH";
-          events->values.push_back(object);
-        }
-
-        virtual void visit(const ExitedEvent& event)
-        {
-          JSON::Object object;
-          object.values["type"] = "EXITED";
-          events->values.push_back(object);
-        }
-
-        virtual void visit(const TerminateEvent& event)
-        {
-          JSON::Object object;
-          object.values["type"] = "TERMINATE";
-          events->values.push_back(object);
-        }
-
-        JSON::Array* events;
-      } visitor(&events);
-
-      synchronized (process->mutex) {
-        foreach (Event* event, process->events) {
-          event->visit(&visitor);
+    return collect(lambda::map(
+        [](ProcessBase* process) {
+          // TODO(benh): Try and "inject" this dispatch or create a
+          // high-priority set of events (i.e., mailbox).
+          return dispatch(
+              process->self(),
+              [process]() -> JSON::Object {
+                return *process;
+              });
+        },
+        process_manager->processes.values()))
+      .then([](const std::list<JSON::Object>& objects) -> Response {
+        JSON::Array array;
+        foreach (const JSON::Object& object, objects) {
+          array.values.push_back(object);
         }
-      }
-
-      object.values["events"] = events;
-      array.values.push_back(object);
-    }
+        return OK(array);
+      });
   }
-
-  return OK(array);
 }
 
 
 ProcessBase::ProcessBase(const string& id)
+  : events(new EventQueue()),
+    refs(0),
+    gate(std::make_shared<Gate>())
 {
   process::initialize();
 
-  state = ProcessBase::BOTTOM;
-
-  refs = 0;
-
-  gate = std::make_shared<Gate>();
-
   pid.id = id != "" ? id : ID::generate();
   pid.address = __address__;
   pid.addresses.v6 = __address6__;
@@ -3718,26 +3677,76 @@ ProcessBase::ProcessBase(const string& id)
 ProcessBase::~ProcessBase() {}
 
 
-void ProcessBase::enqueue(Event* event, bool inject)
+template <>
+size_t ProcessBase::eventCount<MessageEvent>()
 {
-  CHECK(event != nullptr);
+  CHECK_EQ(this, __process__);
+  return events->count<MessageEvent>();
+}
 
-  synchronized (mutex) {
-    if (state != TERMINATING) {
-      if (!inject) {
-        events.push_back(event);
-      } else {
-        events.push_front(event);
-      }
 
-      if (state == BLOCKED) {
-        state = READY;
-        process_manager->enqueue(this);
-      }
+template <>
+size_t ProcessBase::eventCount<DispatchEvent>()
+{
+  CHECK_EQ(this, __process__);
+  return events->count<DispatchEvent>();
+}
 
-      CHECK(state == BOTTOM || state == READY);
-    } else {
+
+template <>
+size_t ProcessBase::eventCount<HttpEvent>()
+{
+  CHECK_EQ(this, __process__);
+  return events->count<HttpEvent>();
+}
+
+
+template <>
+size_t ProcessBase::eventCount<ExitedEvent>()
+{
+  CHECK_EQ(this, __process__);
+  return events->count<ExitedEvent>();
+}
+
+
+template <>
+size_t ProcessBase::eventCount<TerminateEvent>()
+{
+  CHECK_EQ(this, __process__);
+  return events->count<TerminateEvent>();
+}
+
+
+void ProcessBase::enqueue(Event* event, bool inject)
+{
+  CHECK_NOTNULL(event);
+
+  State old = state.load();
+
+  switch (old) {
+    case State::BOTTOM:
+    case State::READY:
+    case State::BLOCKED:
+      events->enqueue(event, inject);
+      break;
+    case State::TERMINATING:
       delete event;
+      return;
+  }
+
+  // If we're BLOCKED then we need to try and enqueue us into the run
+  // queue. It's possible that in the time we enqueued the event and
+  // are attempting to enqueue us in the run queue another thread has
+  // already served the event! Worse case scenario we'll end up
+  // enqueuing us in the run queue only to find out in
+  // `ProcessManager::resume` that our event queue is empty!
+  if ((old = state.load()) == State::BLOCKED) {
+    if (state.compare_exchange_strong(old, State::READY)) {
+      // NOTE: we only enqueue if _we_ successfully did the exchange
+      // since another thread executing this code or a thread in
+      // `ProcessBase::resume` might have already done the exchange to
+      // READY.
+      process_manager->enqueue(this);
     }
   }
 }
@@ -4048,6 +4057,17 @@ void ProcessBase::route(
 }
 
 
+ProcessBase:: operator JSON::Object()
+{
+  CHECK_EQ(this, __process__);
+
+  JSON::Object object;
+  object.values["id"] = pid.id;
+  object.values["events"] = JSON::Array(*events);
+  return object;
+}
+
+
 UPID spawn(ProcessBase* process, bool manage)
 {
   process::initialize();