You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:50:03 UTC
[19/23] git commit: Added a '/__processes' endpoint.
Added a '/__processes' endpoint.
Review: https://reviews.apache.org/r/12528
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d6e399aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d6e399aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d6e399aa
Branch: refs/heads/vinod/0.13.0
Commit: d6e399aa827f990e3a4b228e561f15d5c35c4fb7
Parents: 5b4151e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 12 15:33:25 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jul 12 16:22:15 2013 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 142 ++++++++++++++++++++++++++++++-
1 file changed, 141 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d6e399aa/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 3ffe0b5..373097c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -295,6 +295,56 @@ private:
};
+// Helper for creating routes without a process.
+// TODO(benh): Move this into route.hpp.
+class Route
+{
+public:
+ Route(const string& name,
+ const Option<string>& help,
+ const lambda::function<Future<Response>(const Request&)>& handler)
+ {
+ process = new RouteProcess(name, help, handler);
+ spawn(process);
+ }
+
+ ~Route()
+ {
+ terminate(process);
+ wait(process);
+ }
+
+private:
+ class RouteProcess : public Process<RouteProcess>
+ {
+ public:
+ RouteProcess(
+ const string& name,
+ const Option<string>& _help,
+ const lambda::function<Future<Response>(const Request&)>& _handler)
+ : ProcessBase(strings::remove(name, "/", strings::PREFIX)),
+ help(_help),
+ handler(_handler) {}
+
+ protected:
+ virtual void initialize()
+ {
+ route("/", help, &RouteProcess::handle);
+ }
+
+ Future<Response> handle(const Request& request)
+ {
+ return handler(request);
+ }
+
+ const Option<string> help;
+ const lambda::function<Future<Response>(const Request&)> handler;
+ };
+
+ RouteProcess* process;
+};
+
+
class SocketManager
{
public:
@@ -391,6 +441,9 @@ public:
void settle();
+ // The /__processes__ route.
+ Future<Response> __processes__(const Request&);
+
private:
// Delegate process name to receive root HTTP requests.
const string delegate;
@@ -1410,7 +1463,7 @@ void initialize(const string& delegate)
// TODO(bmahler): Investigate memory implications of this window
// size. We may also want to provide a maximum memory size rather than
// time window. Or, offload older data to disk, etc.
- process::statistics = new Statistics(Weeks(2));
+ statistics = new Statistics(Weeks(2));
// Initialize the mime types.
mime::initialize();
@@ -1418,6 +1471,12 @@ void initialize(const string& delegate)
// Initialize the response statuses.
http::initialize();
+ // Add a route for getting process information.
+ lambda::function<Future<Response>(const Request&)> __processes__ =
+ lambda::bind(&ProcessManager::__processes__, process_manager, lambda::_1);
+
+ new Route("/__processes__", None(), __processes__);
+
char temp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
PLOG(FATAL) << "Failed to initialize, inet_ntop";
@@ -2313,6 +2372,7 @@ bool ProcessManager::deliver(
return true;
}
+
bool ProcessManager::deliver(
const UPID& to,
Event* event,
@@ -2769,6 +2829,86 @@ void ProcessManager::settle()
}
+Future<Response> ProcessManager::__processes__(const Request&)
+{
+ JSON::Array array;
+
+ synchronized (processes) {
+ foreachvalue (const ProcessBase* process, process_manager->processes) {
+ JSON::Object object;
+ object.values["id"] = process->pid.id;
+
+ JSON::Array events;
+
+ struct JSONVisitor : EventVisitor
+ {
+ JSONVisitor(JSON::Array* _events) : events(_events) {}
+
+ virtual void visit(const MessageEvent& event)
+ {
+ JSON::Object object;
+ object.values["type"] = "MESSAGE";
+
+ const Message& message = *event.message;
+
+ object.values["name"] = message.name;
+ object.values["from"] = string(message.from);
+ object.values["to"] = string(message.to);
+ object.values["body"] = message.body;
+
+ events->values.push_back(object);
+ }
+
+ virtual void visit(const HttpEvent& event)
+ {
+ JSON::Object object;
+ object.values["type"] = "HTTP";
+
+ const Request& request = *event.request;
+
+ object.values["method"] = request.method;
+ object.values["url"] = request.url;
+
+ events->values.push_back(object);
+ }
+
+ virtual void visit(const DispatchEvent& event)
+ {
+ JSON::Object object;
+ object.values["type"] = "DISPATCH";
+ events->values.push_back(object);
+ }
+
+ virtual void visit(const ExitedEvent& event)
+ {
+ JSON::Object object;
+ object.values["type"] = "EXITED";
+ events->values.push_back(object);
+ }
+
+ virtual void visit(const TerminateEvent& event)
+ {
+ JSON::Object object;
+ object.values["type"] = "TERMINATE";
+ events->values.push_back(object);
+ }
+
+ JSON::Array* events;
+ } visitor(&events);
+
+ foreach (Event* event, process->events) {
+ event->visit(&visitor);
+ }
+
+ object.values["events"] = events;
+ array.values.push_back(object);
+ }
+ }
+
+ return OK(array);
+}
+
+
Timer Timer::create(
const Duration& duration,
const lambda::function<void(void)>& thunk)