You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/12/05 00:27:29 UTC
[1/3] mesos git commit: Added support for recovering RunState for
HTTP based executors.
Repository: mesos
Updated Branches:
refs/heads/master 9000e9190 -> 517e84669
Added support for recovering RunState for HTTP based executors.
This change adds support for recovering the `RunState` for `HTTP` based
executors. Upon agent recovery, it checks if the marker file for HTTP
exists and populates `RunState.http` based on that. This is later used
by the Agent for recovering `HTTP` based executors.
Review: https://reviews.apache.org/r/39297
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ebff7ce0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ebff7ce0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ebff7ce0
Branch: refs/heads/master
Commit: ebff7ce003f08f2d92af7e28e843f7844f065cab
Parents: 9000e91
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:24:29 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:24:30 2015 -0800
----------------------------------------------------------------------
src/slave/state.cpp | 56 ++++++++++++++++++++++++++++--------------------
src/slave/state.hpp | 4 ++++
2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ebff7ce0/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index bc46cc6..9e470cb 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -500,37 +500,47 @@ Try<RunState> RunState::recover(
path = paths::getLibprocessPidPath(
rootDir, slaveId, frameworkId, executorId, containerId);
- if (!os::exists(path)) {
- // This could happen if the slave died before the executor
- // registered with the slave.
- LOG(WARNING)
- << "Failed to find executor libprocess pid file '" << path << "'";
- return state;
- }
-
- pid = os::read(path);
-
- if (pid.isError()) {
- message = "Failed to read executor libprocess pid from '" + path +
- "': " + pid.error();
+ if (os::exists(path)) {
+ pid = os::read(path);
+
+ if (pid.isError()) {
+ message = "Failed to read executor libprocess pid from '" + path +
+ "': " + pid.error();
+
+ if (strict) {
+ return Error(message);
+ } else {
+ LOG(WARNING) << message;
+ state.errors++;
+ return state;
+ }
+ }
- if (strict) {
- return Error(message);
- } else {
- LOG(WARNING) << message;
- state.errors++;
+ if (pid.get().empty()) {
+ // This could happen if the slave died after opening the file for
+ // writing but before it checkpointed anything.
+ LOG(WARNING) << "Found empty executor libprocess pid file '" << path
+ << "'";
return state;
}
+
+ state.libprocessPid = process::UPID(pid.get());
+ state.http = false;
+
+ return state;
}
- if (pid.get().empty()) {
- // This could happen if the slave died after opening the file for
- // writing but before it checkpointed anything.
- LOG(WARNING) << "Found empty executor libprocess pid file '" << path << "'";
+ path = paths::getExecutorHttpMarkerPath(
+ rootDir, slaveId, frameworkId, executorId, containerId);
+
+ if (!os::exists(path)) {
+ // This could happen if the slave died before the executor
+ // registered with the slave.
+ LOG(WARNING) << "Failed to find executor libprocess pid/http marker file";
return state;
}
- state.libprocessPid = process::UPID(pid.get());
+ state.http = true;
return state;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ebff7ce0/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index eb6b06c..d4235d5 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -207,6 +207,10 @@ struct RunState
Option<pid_t> forkedPid;
Option<process::UPID> libprocessPid;
+ // This represents if the executor is connected via HTTP. It can be None()
+ // when the connection type is unknown.
+ Option<bool> http;
+
// Executor terminated and all its updates acknowledged.
bool completed;
[2/3] mesos git commit: Added functionality for Subscribe/Subscribed
workflow for HTTP executors.
Posted by vi...@apache.org.
Added functionality for Subscribe/Subscribed workflow for HTTP
executors.
This change adds the functionality for executors to `Subscribe` via the
`api/v1/executor` endpoint. It also stores a marker file as part of the
`Subscribe` call if framework `checkpointing` is enabled. This can then
be used by the agent when recovering to wait for reconnecting back with
the executor.
Review: https://reviews.apache.org/r/38877
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2c3faa8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2c3faa8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2c3faa8
Branch: refs/heads/master
Commit: a2c3faa8a08f60381c8b11c40fc51d1c877e749b
Parents: ebff7ce
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:25:11 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:25:13 2015 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 3 +
src/slave/slave.cpp | 206 ++++++++++++++++++++++++++++++++++++++++++++---
src/slave/slave.hpp | 7 ++
3 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index eeebc79..cef568d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -308,6 +308,9 @@ Future<Response> Slave::Http::executor(const Request& request) const
ok.type = Response::PIPE;
ok.reader = pipe.reader();
+ HttpConnection http {pipe.writer(), responseContentType};
+ slave->subscribe(http, call.subscribe(), framework, executor);
+
return ok;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a33187e..9bd86e1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+using mesos::executor::Call;
+
using mesos::slave::QoSController;
using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
@@ -2398,6 +2400,176 @@ void Slave::_statusUpdateAcknowledgement(
}
+void Slave::subscribe(
+ HttpConnection http,
+ const Call::Subscribe& subscribe,
+ Framework* framework,
+ Executor* executor)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(executor);
+
+ LOG(INFO) << "Received Subscribe request for HTTP executor " << *executor;
+
+ CHECK(state == DISCONNECTED || state == RUNNING ||
+ state == TERMINATING) << state;
+
+ if (state == TERMINATING) {
+ LOG(WARNING) << "Shutting down executor " << *executor << " as the slave "
+ << "is terminating";
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ return;
+ }
+
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING)
+ << framework->state;
+
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Shutting down executor " << *executor << " as the "
+ << "framework is terminating";
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ return;
+ }
+
+ switch (executor->state) {
+ case Executor::TERMINATING:
+ case Executor::TERMINATED:
+ // TERMINATED is possible if the executor forks, the parent process
+ // terminates and the child process (driver) tries to register!
+ LOG(WARNING) << "Shutting down executor " << *executor
+ << " because it is in unexpected state " << executor->state;
+ http.send(ShutdownExecutorMessage());
+ http.close();
+ break;
+ case Executor::RUNNING:
+ case Executor::REGISTERING: {
+ // Close the earlier connection if one existed. This can even
+ // be a retried Subscribe request from an already connected
+ // executor.
+ if (executor->http.isSome()) {
+ LOG(WARNING) << "Closing already existing HTTP connection from "
+ << "executor " << *executor;
+ executor->http->close();
+ }
+
+ executor->state = Executor::RUNNING;
+
+ // Save the connection for the executor.
+ executor->http = http;
+ executor->pid = None();
+
+ if (framework->info.checkpoint()) {
+ // Write a marker file to indicate that this executor
+ // is HTTP based.
+ const string path = paths::getExecutorHttpMarkerPath(
+ metaDir,
+ info.id(),
+ framework->id(),
+ executor->id,
+ executor->containerId);
+
+ LOG(INFO) << "Creating a marker file for HTTP based executor "
+ << *executor << " at path '" << path << "'";
+ CHECK_SOME(os::touch(path));
+ }
+
+ // Tell executor it's registered and give it any queued tasks.
+ ExecutorRegisteredMessage message;
+ message.mutable_executor_info()->MergeFrom(executor->info);
+ message.mutable_framework_id()->MergeFrom(framework->id());
+ message.mutable_framework_info()->MergeFrom(framework->info);
+ message.mutable_slave_id()->MergeFrom(info.id());
+ message.mutable_slave_info()->MergeFrom(info);
+ executor->send(message);
+
+ // Handle all the pending updates.
+ // The status update manager might have already checkpointed some
+ // of these pending updates (for example, if the slave died right
+ // after it checkpointed the update but before it could send the
+ // ACK to the executor). This is ok because the status update
+ // manager correctly handles duplicate updates.
+ foreach (const Call::Update& update, subscribe.updates()) {
+ // NOTE: This also updates the executor's resources!
+ statusUpdate(protobuf::createStatusUpdate(
+ framework->id(),
+ update.status(),
+ info.id()),
+ None());
+ }
+
+ // Update the resource limits for the container. Note that the
+ // resource limits include the currently queued tasks because we
+ // want the container to have enough resources to hold the
+ // upcoming tasks.
+ Resources resources = executor->resources;
+
+ // TODO(jieyu): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
+ resources += task.resources();
+ }
+
+ containerizer->update(executor->containerId, resources)
+ .onAny(defer(self(),
+ &Self::runTasks,
+ lambda::_1,
+ framework->id(),
+ executor->id,
+ executor->containerId,
+ executor->queuedTasks.values()));
+
+ hashmap<TaskID, TaskInfo> unackedTasks;
+ foreach (const TaskInfo& task, subscribe.tasks()) {
+ unackedTasks[task.task_id()] = task;
+ }
+
+ // Now, if there is any task still in STAGING state and not in
+ // unacknowledged 'tasks' known to the executor, the slave must
+ // have died before the executor received the task! We should
+ // transition it to TASK_LOST. We only consider/store
+ // unacknowledged 'tasks' at the executor driver because if a
+ // task has been acknowledged, the slave must have received
+ // an update for that task and transitioned it out of STAGING!
+ // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
+ // 'Task' so that we can relaunch such tasks! Currently we
+ // don't do it because 'TaskInfo.data' could be huge.
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->launchedTasks.values()) {
+ if (task->state() == TASK_STAGING &&
+ !unackedTasks.contains(task->task_id())) {
+ LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+ << " to LOST because it is unknown to the executor "
+ << executor->id;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ framework->id(),
+ info.id(),
+ task->task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ UUID::random(),
+ "Task launched during slave restart",
+ TaskStatus::REASON_SLAVE_RESTARTED,
+ executor->id);
+
+ statusUpdate(update, UPID());
+ }
+ }
+
+ break;
+ }
+ default:
+ LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
+ << executor->state;
+ break;
+ }
+}
+
+
void Slave::registerExecutor(
const UPID& from,
const FrameworkID& frameworkId,
@@ -5206,18 +5378,28 @@ void Framework::recoverExecutor(const ExecutorState& state)
Executor* executor = new Executor(
slave, id(), state.info.get(), latest, directory, info.checkpoint());
- // Recover the libprocess PID if possible.
- if (run.get().libprocessPid.isSome()) {
- // When recovering in non-strict mode, the assumption is that the
- // slave can die after checkpointing the forked pid but before the
- // libprocess pid. So, it is not possible for the libprocess pid
- // to exist but not the forked pid. If so, it is a really bad
- // situation (e.g., disk corruption).
- CHECK_SOME(run.get().forkedPid)
- << "Failed to get forked pid for executor " << state.id
- << " of framework " << id();
-
- executor->pid = run.get().libprocessPid.get();
+ // Recover the libprocess PID if possible for PID based executors.
+ if (run.get().http.isSome()) {
+ if (!run.get().http.get()) {
+ // When recovering in non-strict mode, the assumption is that the
+ // slave can die after checkpointing the forked pid but before the
+ // libprocess pid. So, it is not possible for the libprocess pid
+ // to exist but not the forked pid. If so, it is a really bad
+ // situation (e.g., disk corruption).
+ CHECK_SOME(run.get().forkedPid)
+ << "Failed to get forked pid for executor " << state.id
+ << " of framework " << id();
+
+ executor->pid = run.get().libprocessPid.get();
+ } else {
+ // We set the PID to None() to signify that this is a HTTP based
+ // executor.
+ executor->pid = None();
+ }
+ } else {
+ // We set the PID to UPID() to signify that the connection type for this
+ // executor is unknown.
+ executor->pid = UPID();
}
// And finally recover all the executor's tasks.
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5ee133a..b7586ce 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -88,6 +88,7 @@ using namespace process;
class StatusUpdateManager;
struct Executor;
struct Framework;
+struct HttpConnection;
class Slave : public ProtobufProcess<Slave>
{
@@ -161,6 +162,12 @@ public:
void checkpointResources(const std::vector<Resource>& checkpointedResources);
+ void subscribe(
+ HttpConnection http,
+ const executor::Call::Subscribe& subscribe,
+ Framework* framework,
+ Executor* executor);
+
void registerExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
[3/3] mesos git commit: Added test for the Subscribe->Subscribed
workflow for the Executor HTTP API.
Posted by vi...@apache.org.
Added test for the Subscribe->Subscribed workflow for the Executor
HTTP API.
This change adds a basic test to validate the implementation for
Subscribe->Subscribed workflow on the `api/v1/executor` endpoint on
Agent.
Review: https://reviews.apache.org/r/38878
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/517e8466
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/517e8466
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/517e8466
Branch: refs/heads/master
Commit: 517e84669790f833f6cf0e7911968a3591c93419
Parents: a2c3faa
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:26:10 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:26:11 2015 -0800
----------------------------------------------------------------------
src/tests/executor_http_api_tests.cpp | 108 +++++++++++++++++++++++++++++
1 file changed, 108 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/517e8466/src/tests/executor_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp
index fe9df1f..1be657c 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -25,34 +25,45 @@
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
+#include <process/message.hpp>
#include <process/pid.hpp>
#include "common/http.hpp"
+#include "common/recordio.hpp"
#include "master/master.hpp"
+#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::Master;
+using mesos::internal::recordio::Reader;
+
using mesos::internal::slave::Slave;
using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
using process::Clock;
using process::Future;
+using process::Message;
using process::PID;
using process::http::BadRequest;
using process::http::MethodNotAllowed;
using process::http::NotAcceptable;
using process::http::OK;
+using process::http::Pipe;
using process::http::Response;
using process::http::UnsupportedMediaType;
+using recordio::Decoder;
+
using std::string;
using std::vector;
+using testing::Eq;
using testing::WithParamInterface;
namespace mesos {
@@ -694,6 +705,103 @@ TEST_P(ExecutorHttpApiTest, StatusUpdateCallFailedValidation)
}
+// This test verifies if the executor is able to receive a Subscribed
+// event in response to a Subscribe call request.
+TEST_P(ExecutorHttpApiTest, Subscribe)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ MockExecutor exec(executorId);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+
+ Future<Message> registerExecutorMessage =
+ DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
+ driver.launchTasks(offers.get()[0].id(), {taskInfo});
+
+ // Drop the `RegisterExecutorMessage` and then send a `Subscribe` request
+ // from the HTTP based executor.
+ AWAIT_READY(registerExecutorMessage);
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+ call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+ call.set_type(Call::SUBSCRIBE);
+
+ call.mutable_subscribe();
+
+ // Retrieve the parameter passed as content type to this test.
+ const ContentType contentType = GetParam();
+
+ process::http::Headers headers;
+ headers["Accept"] = stringify(contentType);
+
+ Future<Response> response = process::http::streaming::post(
+ slave.get(),
+ "api/v1/executor",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ EXPECT_SOME_EQ(stringify(contentType),
+ response.get().headers.get("Content-Type"));
+
+ EXPECT_SOME_EQ("chunked",
+ response.get().headers.get("Transfer-Encoding"));
+
+ ASSERT_EQ(Response::PIPE, response.get().type);
+
+ Option<Pipe::Reader> reader = response.get().reader;
+ ASSERT_SOME(reader);
+
+ auto deserializer =
+ lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+ Reader<Event> responseDecoder(
+ Decoder<Event>(deserializer),
+ reader.get());
+
+ Future<Result<Event>> event = responseDecoder.read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and if the ExecutorID matches.
+ ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+ ASSERT_EQ(event.get().get().subscribed().executor_info().executor_id(),
+ call.executor_id());
+
+ reader.get().close();
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {