You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/12/05 00:27:30 UTC
[2/3] mesos git commit: Added functionality for Subscribe/Subscribed
workflow for HTTP executors.
Added functionality for Subscribe/Subscribed workflow for HTTP
executors.
This change adds the functionality for executors to `Subscribe` via the
`api/v1/executor` endpoint. It also stores a marker file as part of the
`Subscribe` call if framework `checkpointing` is enabled. This can then
be used by the agent when recovering to wait for reconnecting back with
the executor.
Review: https://reviews.apache.org/r/38877
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2c3faa8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2c3faa8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2c3faa8
Branch: refs/heads/master
Commit: a2c3faa8a08f60381c8b11c40fc51d1c877e749b
Parents: ebff7ce
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:25:11 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:25:13 2015 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 3 +
src/slave/slave.cpp | 206 ++++++++++++++++++++++++++++++++++++++++++++---
src/slave/slave.hpp | 7 ++
3 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index eeebc79..cef568d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -308,6 +308,9 @@ Future<Response> Slave::Http::executor(const Request& request) const
ok.type = Response::PIPE;
ok.reader = pipe.reader();
+ HttpConnection http {pipe.writer(), responseContentType};
+ slave->subscribe(http, call.subscribe(), framework, executor);
+
return ok;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a33187e..9bd86e1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+using mesos::executor::Call;
+
using mesos::slave::QoSController;
using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
@@ -2398,6 +2400,176 @@ void Slave::_statusUpdateAcknowledgement(
}
+void Slave::subscribe(
+ HttpConnection http,
+ const Call::Subscribe& subscribe,
+ Framework* framework,
+ Executor* executor)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(executor);
+
+ LOG(INFO) << "Received Subscribe request for HTTP executor " << *executor;
+
+ CHECK(state == DISCONNECTED || state == RUNNING ||
+ state == TERMINATING) << state;
+
+ if (state == TERMINATING) {
+ LOG(WARNING) << "Shutting down executor " << *executor << " as the slave "
+ << "is terminating";
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ return;
+ }
+
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING)
+ << framework->state;
+
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Shutting down executor " << *executor << " as the "
+ << "framework is terminating";
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ return;
+ }
+
+ switch (executor->state) {
+ case Executor::TERMINATING:
+ case Executor::TERMINATED:
+ // TERMINATED is possible if the executor forks, the parent process
+ // terminates and the child process (driver) tries to register!
+ LOG(WARNING) << "Shutting down executor " << *executor
+ << " because it is in unexpected state " << executor->state;
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ break;
+ case Executor::RUNNING:
+ case Executor::REGISTERING: {
+ // Close the earlier connection if one existed. This can even
+ // be a retried Subscribe request from an already connected
+ // executor.
+ if (executor->http.isSome()) {
+ LOG(WARNING) << "Closing already existing HTTP connection from "
+ << "executor " << *executor;
+ executor->http->close();
+ }
+
+ executor->state = Executor::RUNNING;
+
+ // Save the connection for the executor.
+ executor->http = http;
+ executor->pid = None();
+
+ if (framework->info.checkpoint()) {
+ // Write a marker file to indicate that this executor
+ // is HTTP based.
+ const string path = paths::getExecutorHttpMarkerPath(
+ metaDir,
+ info.id(),
+ framework->id(),
+ executor->id,
+ executor->containerId);
+
+ LOG(INFO) << "Creating a marker file for HTTP based executor "
+ << *executor << " at path '" << path << "'";
+ CHECK_SOME(os::touch(path));
+ }
+
+ // Tell executor it's registered and give it any queued tasks.
+ ExecutorRegisteredMessage message;
+ message.mutable_executor_info()->MergeFrom(executor->info);
+ message.mutable_framework_id()->MergeFrom(framework->id());
+ message.mutable_framework_info()->MergeFrom(framework->info);
+ message.mutable_slave_id()->MergeFrom(info.id());
+ message.mutable_slave_info()->MergeFrom(info);
+ executor->send(message);
+
+ // Handle all the pending updates.
+ // The status update manager might have already checkpointed some
+ // of these pending updates (for example, if the slave died right
+ // after it checkpointed the update but before it could send the
+ // ACK to the executor). This is ok because the status update
+ // manager correctly handles duplicate updates.
+ foreach (const Call::Update& update, subscribe.updates()) {
+ // NOTE: This also updates the executor's resources!
+ statusUpdate(protobuf::createStatusUpdate(
+ framework->id(),
+ update.status(),
+ info.id()),
+ None());
+ }
+
+ // Update the resource limits for the container. Note that the
+ // resource limits include the currently queued tasks because we
+ // want the container to have enough resources to hold the
+ // upcoming tasks.
+ Resources resources = executor->resources;
+
+ // TODO(jieyu): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
+ resources += task.resources();
+ }
+
+ containerizer->update(executor->containerId, resources)
+ .onAny(defer(self(),
+ &Self::runTasks,
+ lambda::_1,
+ framework->id(),
+ executor->id,
+ executor->containerId,
+ executor->queuedTasks.values()));
+
+ hashmap<TaskID, TaskInfo> unackedTasks;
+ foreach (const TaskInfo& task, subscribe.tasks()) {
+ unackedTasks[task.task_id()] = task;
+ }
+
+ // Now, if there is any task still in STAGING state and not in
+ // unacknowledged 'tasks' known to the executor, the slave must
+ // have died before the executor received the task! We should
+ // transition it to TASK_LOST. We only consider/store
+ // unacknowledged 'tasks' at the executor driver because if a
+ // task has been acknowledged, the slave must have received
+ // an update for that task and transitioned it out of STAGING!
+ // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
+ // 'Task' so that we can relaunch such tasks! Currently we
+ // don't do it because 'TaskInfo.data' could be huge.
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->launchedTasks.values()) {
+ if (task->state() == TASK_STAGING &&
+ !unackedTasks.contains(task->task_id())) {
+ LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+ << " to LOST because it is unknown to the executor "
+ << executor->id;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ framework->id(),
+ info.id(),
+ task->task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ UUID::random(),
+ "Task launched during slave restart",
+ TaskStatus::REASON_SLAVE_RESTARTED,
+ executor->id);
+
+ statusUpdate(update, UPID());
+ }
+ }
+
+ break;
+ }
+ default:
+ LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
+ << executor->state;
+ break;
+ }
+}
+
+
void Slave::registerExecutor(
const UPID& from,
const FrameworkID& frameworkId,
@@ -5206,18 +5378,28 @@ void Framework::recoverExecutor(const ExecutorState& state)
Executor* executor = new Executor(
slave, id(), state.info.get(), latest, directory, info.checkpoint());
- // Recover the libprocess PID if possible.
- if (run.get().libprocessPid.isSome()) {
- // When recovering in non-strict mode, the assumption is that the
- // slave can die after checkpointing the forked pid but before the
- // libprocess pid. So, it is not possible for the libprocess pid
- // to exist but not the forked pid. If so, it is a really bad
- // situation (e.g., disk corruption).
- CHECK_SOME(run.get().forkedPid)
- << "Failed to get forked pid for executor " << state.id
- << " of framework " << id();
-
- executor->pid = run.get().libprocessPid.get();
+ // Recover the libprocess PID if possible for PID based executors.
+ if (run.get().http.isSome()) {
+ if (!run.get().http.get()) {
+ // When recovering in non-strict mode, the assumption is that the
+ // slave can die after checkpointing the forked pid but before the
+ // libprocess pid. So, it is not possible for the libprocess pid
+ // to exist but not the forked pid. If so, it is a really bad
+ // situation (e.g., disk corruption).
+ CHECK_SOME(run.get().forkedPid)
+ << "Failed to get forked pid for executor " << state.id
+ << " of framework " << id();
+
+ executor->pid = run.get().libprocessPid.get();
+ } else {
+ // We set the PID to None() to signify that this is a HTTP based
+ // executor.
+ executor->pid = None();
+ }
+ } else {
+ // We set the PID to UPID() to signify that the connection type for this
+ // executor is unknown.
+ executor->pid = UPID();
}
// And finally recover all the executor's tasks.
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5ee133a..b7586ce 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -88,6 +88,7 @@ using namespace process;
class StatusUpdateManager;
struct Executor;
struct Framework;
+struct HttpConnection;
class Slave : public ProtobufProcess<Slave>
{
@@ -161,6 +162,12 @@ public:
void checkpointResources(const std::vector<Resource>& checkpointedResources);
+ void subscribe(
+ HttpConnection http,
+ const executor::Call::Subscribe& subscribe,
+ Framework* framework,
+ Executor* executor);
+
void registerExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,