You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:50:03 UTC

[19/23] git commit: Added a '/__processes' endpoint.

Added a '/__processes' endpoint.

Review: https://reviews.apache.org/r/12528


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d6e399aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d6e399aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d6e399aa

Branch: refs/heads/vinod/0.13.0
Commit: d6e399aa827f990e3a4b228e561f15d5c35c4fb7
Parents: 5b4151e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 12 15:33:25 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jul 12 16:22:15 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 142 ++++++++++++++++++++++++++++++-
 1 file changed, 141 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d6e399aa/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 3ffe0b5..373097c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -295,6 +295,56 @@ private:
 };
 
 
+// Helper for creating routes without a process.
+// TODO(benh): Move this into route.hpp.
+class Route
+{
+public:
+  Route(const string& name,
+        const Option<string>& help,
+        const lambda::function<Future<Response>(const Request&)>& handler)
+  {
+    process = new RouteProcess(name, help, handler);
+    spawn(process);
+  }
+
+  ~Route()
+  {
+    terminate(process);
+    wait(process);
+  }
+
+private:
+  class RouteProcess : public Process<RouteProcess>
+  {
+  public:
+    RouteProcess(
+        const string& name,
+        const Option<string>& _help,
+        const lambda::function<Future<Response>(const Request&)>& _handler)
+      : ProcessBase(strings::remove(name, "/", strings::PREFIX)),
+        help(_help),
+        handler(_handler) {}
+
+  protected:
+    virtual void initialize()
+    {
+      route("/", help, &RouteProcess::handle);
+    }
+
+    Future<Response> handle(const Request& request)
+    {
+      return handler(request);
+    }
+
+    const Option<string> help;
+    const lambda::function<Future<Response>(const Request&)> handler;
+  };
+
+  RouteProcess* process;
+};
+
+
 class SocketManager
 {
 public:
@@ -391,6 +441,9 @@ public:
 
   void settle();
 
+  // The /__processes__ route.
+  Future<Response> __processes__(const Request&);
+
 private:
   // Delegate process name to receive root HTTP requests.
   const string delegate;
@@ -1410,7 +1463,7 @@ void initialize(const string& delegate)
   // TODO(bmahler): Investigate memory implications of this window
   // size. We may also want to provide a maximum memory size rather than
   // time window. Or, offload older data to disk, etc.
-  process::statistics = new Statistics(Weeks(2));
+  statistics = new Statistics(Weeks(2));
 
   // Initialize the mime types.
   mime::initialize();
@@ -1418,6 +1471,12 @@ void initialize(const string& delegate)
   // Initialize the response statuses.
   http::initialize();
 
+  // Add a route for getting process information.
+  lambda::function<Future<Response>(const Request&)> __processes__ =
+    lambda::bind(&ProcessManager::__processes__, process_manager, lambda::_1);
+
+  new Route("/__processes__", None(), __processes__);
+
   char temp[INET_ADDRSTRLEN];
   if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
     PLOG(FATAL) << "Failed to initialize, inet_ntop";
@@ -2313,6 +2372,7 @@ bool ProcessManager::deliver(
   return true;
 }
 
+
 bool ProcessManager::deliver(
     const UPID& to,
     Event* event,
@@ -2769,6 +2829,86 @@ void ProcessManager::settle()
 }
 
 
+Future<Response> ProcessManager::__processes__(const Request&)
+{
+  JSON::Array array;
+
+  synchronized (processes) {
+    foreachvalue (const ProcessBase* process, process_manager->processes) {
+      JSON::Object object;
+      object.values["id"] = process->pid.id;
+
+      JSON::Array events;
+
+      struct JSONVisitor : EventVisitor
+      {
+        JSONVisitor(JSON::Array* _events) : events(_events) {}
+
+        virtual void visit(const MessageEvent& event)
+        {
+          JSON::Object object;
+          object.values["type"] = "MESSAGE";
+
+          const Message& message = *event.message;
+
+          object.values["name"] = message.name;
+          object.values["from"] = string(message.from);
+          object.values["to"] = string(message.to);
+          object.values["body"] = message.body;
+
+          events->values.push_back(object);
+        }
+
+        virtual void visit(const HttpEvent& event)
+        {
+          JSON::Object object;
+          object.values["type"] = "HTTP";
+
+          const Request& request = *event.request;
+
+          object.values["method"] = request.method;
+          object.values["url"] = request.url;
+
+          events->values.push_back(object);
+        }
+
+        virtual void visit(const DispatchEvent& event)
+        {
+          JSON::Object object;
+          object.values["type"] = "DISPATCH";
+          events->values.push_back(object);
+        }
+
+        virtual void visit(const ExitedEvent& event)
+        {
+          JSON::Object object;
+          object.values["type"] = "EXITED";
+          events->values.push_back(object);
+        }
+
+        virtual void visit(const TerminateEvent& event)
+        {
+          JSON::Object object;
+          object.values["type"] = "TERMINATE";
+          events->values.push_back(object);
+        }
+
+        JSON::Array* events;
+      } visitor(&events);
+
+      foreach (Event* event, process->events) {
+        event->visit(&visitor);
+      }
+
+      object.values["events"] = events;
+      array.values.push_back(object);
+    }
+  }
+
+  return OK(array);
+}
+
+
 Timer Timer::create(
     const Duration& duration,
     const lambda::function<void(void)>& thunk)