You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/02/28 23:31:58 UTC
git commit: Added completed frameworks/tasks to slave re-registration.
Repository: mesos
Updated Branches:
refs/heads/master 7bc952510 -> bcd4dc19e
Added completed frameworks/tasks to slave re-registration.
Fixes MESOS-767.
Review: https://reviews.apache.org/r/16724
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bcd4dc19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bcd4dc19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bcd4dc19
Branch: refs/heads/master
Commit: bcd4dc19e10c4b54b90fae713d14477b849147c2
Parents: 7bc9525
Author: Adam B <ad...@mesosphere.io>
Authored: Fri Feb 28 14:31:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Feb 28 14:31:36 2014 -0800
----------------------------------------------------------------------
include/mesos/scheduler.hpp | 2 +-
src/master/master.cpp | 50 ++++++++-
src/master/master.hpp | 21 +++-
src/messages/messages.proto | 15 +++
src/slave/slave.cpp | 35 ++++++-
src/tests/fault_tolerance_tests.cpp | 170 ++++++++++++++++++++++++++++++-
src/tests/mesos.hpp | 28 +++--
7 files changed, 298 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 55db177..85db111 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -410,7 +410,7 @@ private:
// URL for the master (e.g., zk://, file://, etc).
std::string url;
- // Mutex to enforce all non-callbacks are execute serially.
+ // Mutex to enforce all non-callbacks are executed serially.
pthread_mutex_t mutex;
// Condition variable for waiting until driver terminates.
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2e86a19..f7ba9aa 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -443,7 +443,8 @@ void Master::initialize()
&ReregisterSlaveMessage::slave_id,
&ReregisterSlaveMessage::slave,
&ReregisterSlaveMessage::executor_infos,
- &ReregisterSlaveMessage::tasks);
+ &ReregisterSlaveMessage::tasks,
+ &ReregisterSlaveMessage::completed_frameworks);
install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
@@ -1003,7 +1004,7 @@ void Master::reregisterFramework(
}
}
- // N.B. Need to add the framwwork _after_ we add it's tasks
+ // N.B. Need to add the framework _after_ we add its tasks
// (above) so that we can properly determine the resources it's
// currently using!
addFramework(framework);
@@ -1866,7 +1867,8 @@ void Master::reregisterSlave(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks)
+ const vector<Task>& tasks,
+ const vector<Archive::Framework>& completedFrameworks_)
{
if (!elected()) {
LOG(WARNING) << "Ignoring re-register slave message from "
@@ -1951,7 +1953,7 @@ void Master::reregisterSlave(
LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
<< slave->pid << " (" << slave->info.hostname() << ")";
- readdSlave(slave, executorInfos, tasks);
+ readdSlave(slave, executorInfos, tasks, completedFrameworks_);
}
// Send the latest framework pids to the slave.
@@ -2876,7 +2878,8 @@ void Master::addSlave(Slave* slave, bool reregister)
void Master::readdSlave(Slave* slave,
const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks)
+ const vector<Task>& tasks,
+ const vector<Archive::Framework>& completedFrameworks_)
{
CHECK_NOTNULL(slave);
@@ -2941,10 +2944,47 @@ void Master::readdSlave(Slave* slave,
resources[task.framework_id()] += task.resources();
}
+ foreach (const Archive::Framework& completedFramework_,
+ completedFrameworks_) {
+ LOG(INFO) << "Re-add completed framework "
+ << completedFramework_.framework_info().id()
+ << " from slave " << slave->id << " ("
+ << slave->info.hostname() << ")";
+ readdCompletedFramework(completedFramework_);
+ }
+
allocator->slaveAdded(slave->id, slave->info, resources);
}
+void Master::readdCompletedFramework(
+ const Archive::Framework& completedFramework_)
+{
+ const FrameworkInfo& frameworkInfo = completedFramework_.framework_info();
+ Option<shared_ptr<Framework> > framework = None();
+ foreach (const shared_ptr<Framework>& completedFramework,
+ completedFrameworks) {
+ if (completedFramework->id == frameworkInfo.id()) {
+ framework = completedFramework;
+ break;
+ }
+ }
+
+ if (framework.isNone()) {
+ UPID pid = completedFramework_.pid();
+ framework = shared_ptr<Framework>(
+ new Framework(frameworkInfo, frameworkInfo.id(), pid));
+ VLOG(1) << "Re-adding completed framework " << framework.get()->id;
+ completedFrameworks.push_back(framework.get());
+ }
+
+ foreach (const Task& task, completedFramework_.tasks()) {
+ VLOG(2) << "Re-adding completed task " << task.task_id();
+ framework.get()->addCompletedTask(task);
+ }
+}
+
+
// Lose all of a slave's tasks and delete the slave object.
void Master::removeSlave(Slave* slave)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 72525d2..49a3e15 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -143,7 +143,9 @@ public:
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks);
+ const std::vector<Task>& tasks,
+ const std::vector<Archive::Framework>& completedFrameworks);
+
void unregisterSlave(
const SlaveID& slaveId);
void statusUpdate(
@@ -243,8 +245,11 @@ protected:
void addSlave(Slave* slave, bool reregister = false);
void readdSlave(Slave* slave,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks);
+ const std::vector<ExecutorInfo>& executorInfos,
+ const std::vector<Task>& tasks,
+ const std::vector<Archive::Framework>& completedFrameworks);
+
+ void readdCompletedFramework(const Archive::Framework& completedFramework);
// Lose all of a slave's tasks and delete the slave object
void removeSlave(Slave* slave);
@@ -465,7 +470,7 @@ struct Slave
}
bool hasExecutor(const FrameworkID& frameworkId,
- const ExecutorID& executorId) const
+ const ExecutorID& executorId) const
{
return executors.contains(frameworkId) &&
executors.get(frameworkId).get().contains(executorId);
@@ -543,7 +548,7 @@ struct Framework
Framework(const FrameworkInfo& _info,
const FrameworkID& _id,
const process::UPID& _pid,
- const process::Time& time)
+ const process::Time& time = process::Clock::now())
: id(_id),
info(_info),
pid(_pid),
@@ -584,6 +589,12 @@ struct Framework
resources -= task->resources();
}
+ void addCompletedTask(const Task& task)
+ {
+ // TODO(adam-mesos): Check if completed task already exists.
+ completedTasks.push_back(memory::shared_ptr<Task>(new Task(task)));
+ }
+
void addOffer(Offer* offer)
{
CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 922a8c4..c26a3d0 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -228,6 +228,7 @@ message ReregisterSlaveMessage {
required SlaveInfo slave = 2;
repeated ExecutorInfo executor_infos = 4;
repeated Task tasks = 3;
+ repeated Archive.Framework completed_frameworks = 5;
}
@@ -364,3 +365,17 @@ message AuthenticationErrorMessage {
optional string error = 1;
}
+
+// TODO(adam-mesos): Move this to an 'archive' package.
+/**
+ * Describes Completed Frameworks, etc. for archival.
+ */
+message Archive {
+ message Framework {
+ required FrameworkInfo framework_info = 1;
+ optional string pid = 2;
+ repeated Task tasks = 3;
+ }
+ repeated Framework frameworks = 1;
+}
+
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4f5349b..b350df4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -554,7 +554,7 @@ void Slave::doReliableRegistration()
message.mutable_slave()->CopyFrom(info);
message.mutable_slave()->mutable_id()->CopyFrom(info.id());
- foreachvalue (Framework* framework, frameworks){
+ foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
// Ignore terminated executors because they do not consume
// any resources.
@@ -596,6 +596,39 @@ void Slave::doReliableRegistration()
}
}
+ // Add completed frameworks.
+ foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
+ VLOG(1) << "Reregistering completed framework "
+ << completedFramework->id;
+ Archive::Framework* completedFramework_ =
+ message.add_completed_frameworks();
+ FrameworkInfo* frameworkInfo =
+ completedFramework_->mutable_framework_info();
+ frameworkInfo->CopyFrom(completedFramework->info);
+
+ // TODO(adam-mesos): Needed because FrameworkInfo doesn't have the id.
+ frameworkInfo->mutable_id()->CopyFrom(completedFramework->id);
+
+ completedFramework_->set_pid(completedFramework->pid);
+
+ foreach (const Owned<Executor>& executor,
+ completedFramework->completedExecutors) {
+ VLOG(2) << "Reregistering completed executor " << executor->id
+ << " with " << executor->terminatedTasks.size()
+ << " terminated tasks, " << executor->completedTasks.size()
+ << " completed tasks";
+ foreach (const Task* task, executor->terminatedTasks.values()) {
+ VLOG(2) << "Reregistering terminated task " << task->task_id();
+ completedFramework_->add_tasks()->CopyFrom(*task);
+ }
+ foreach (const memory::shared_ptr<Task>& task,
+ executor->completedTasks) {
+ VLOG(2) << "Reregistering completed task " << task->task_id();
+ completedFramework_->add_tasks()->CopyFrom(*task);
+ }
+ }
+ }
+
CHECK_SOME(master);
send(master.get(), message);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 59632b0..3f4796a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -31,6 +31,7 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
+#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
@@ -63,6 +64,8 @@ using process::Message;
using process::Owned;
using process::PID;
using process::UPID;
+using process::http::OK;
+using process::http::Response;
using std::string;
using std::map;
@@ -655,6 +658,169 @@ TEST_F(FaultToleranceTest, MasterFailover)
}
+// TODO(adam-mesos): Use real JSON parser (see stout/json.hpp TODOs).
+bool isJsonValueEmpty(const string& text, const string& key)
+{
+ string sub = text.substr(text.find(key));
+ size_t index = sub.find(":") + 1;
+ // This will not retrieve the entire value if the value is
+ // an object/array since it will stop at the first comma,
+ // but this is good enough to figure out if the value is empty.
+ return (sub.substr(index, sub.find(",") - index) == "[]");
+}
+
+
+// This test ensures that a recovering master recovers completed frameworks
+// and tasks from a slave's re-registration.
+TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
+{
+ // Step 1. Start Master and Slave.
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor executor(DEFAULT_EXECUTOR_ID);
+ StandaloneMasterDetector* detector =
+ new StandaloneMasterDetector(master.get());
+
+ Try<PID<Slave> > slave =
+ StartSlave(&executor, Owned<MasterDetector>(detector));
+ ASSERT_SOME(slave);
+
+ // Verify master/slave have 0 completed/running frameworks.
+ Future<Response> masterState = process::http::get(master.get(), "state.json");
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, masterState);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+ "application/json",
+ "Content-Type",
+ masterState);
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ // Step 2. Create/start framework.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ EXPECT_NE("", frameworkId.get().value());
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Step 3. Create/launch a task.
+ TaskInfo task = createTask(offers.get()[0], "exit 1", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Short-lived task.
+
+ EXPECT_CALL(executor, registered(_, _, _, _));
+ EXPECT_CALL(executor, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ // Verify master and slave recognize the running task/framework.
+ masterState = process::http::get(master.get(), "state.json");
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ Future<Response> slaveState = process::http::get(slave.get(), "state.json");
+ EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+ EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+ // Step 4. Kill task.
+ EXPECT_CALL(executor, killTask(_, _))
+ .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+ Future<TaskStatus> statusKilled;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusKilled));
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(statusKilled);
+ ASSERT_EQ(TASK_KILLED, statusKilled.get().state());
+
+ masterState = process::http::get(master.get(), "state.json");
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ slaveState = process::http::get(slave.get(), "state.json");
+ EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+ EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+ // Step 5. Stop the framework, shutdown executor.
+ Future<Nothing> shutdown;
+ EXPECT_CALL(executor, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(executorTerminated);
+ AWAIT_READY(shutdown);
+
+ // Verify master sees completed framework.
+ masterState = process::http::get(master.get(), "state.json");
+ EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ // Slave received message to shutdown the framework,
+ // need to wait for it to actually happen.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // Verify slave sees completed framework.
+ slaveState = process::http::get(slave.get(), "state.json");
+ EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+ EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+ // Step 6. Simulate failed over master by restarting the master.
+ Stop(master.get());
+ master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Verify new master knows of no running/completed frameworks.
+ masterState = process::http::get(master.get(), "state.json");
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Simulate a new master detected message to the slave.
+ detector->appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Verify completed framework/task in new Master.
+ masterState = process::http::get(master.get(), "state.json");
+ EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+ EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+ Shutdown();
+}
+
+
TEST_F(FaultToleranceTest, SchedulerFailover)
{
Try<PID<Master> > master = StartMaster();
@@ -1038,7 +1204,9 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
// Drop the first status update message
// between master and the scheduler.
Future<StatusUpdateMessage> statusUpdateMessage =
- DROP_PROTOBUF(StatusUpdateMessage(), _, Not(AnyOf(Eq(master.get()), Eq(slave.get()))));
+ DROP_PROTOBUF(StatusUpdateMessage(),
+ _,
+ Not(AnyOf(Eq(master.get()), Eq(slave.get()))));
driver1.launchTasks(offers.get()[0].id(), tasks);
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 018d4ff..f77fbfe 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -224,22 +224,30 @@ protected:
credential; })
-#define DEFAULT_EXECUTOR_ID \
+#define DEFAULT_EXECUTOR_ID \
DEFAULT_EXECUTOR_INFO.executor_id()
inline TaskInfo createTask(
const Offer& offer,
const std::string& command,
+ const Option<mesos::ExecutorID>& executorId = None(),
const std::string& name = "test-task",
const std::string& id = UUID::random().toString())
{
TaskInfo task;
task.set_name(name);
task.mutable_task_id()->set_value(id);
- task.mutable_slave_id()->MergeFrom(offer.slave_id());
- task.mutable_resources()->MergeFrom(offer.resources());
- task.mutable_command()->set_value(command);
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+ if (executorId.isSome()) {
+ ExecutorInfo executor;
+ executor.mutable_executor_id()->CopyFrom(executorId.get());
+ executor.mutable_command()->set_value(command);
+ task.mutable_executor()->CopyFrom(executor);
+ } else {
+ task.mutable_command()->set_value(command);
+ }
return task;
}
@@ -651,18 +659,18 @@ public:
: cpus(_cpus), mem(_mem) {}
virtual bool MatchAndExplain(const std::vector<Offer>& offers,
- ::testing::MatchResultListener* listener) const
+ ::testing::MatchResultListener* listener) const
{
double totalCpus = 0;
double totalMem = 0;
foreach (const Offer& offer, offers) {
foreach (const Resource& resource, offer.resources()) {
- if (resource.name() == "cpus") {
- totalCpus += resource.scalar().value();
- } else if (resource.name() == "mem") {
- totalMem += resource.scalar().value();
- }
+ if (resource.name() == "cpus") {
+ totalCpus += resource.scalar().value();
+ } else if (resource.name() == "mem") {
+ totalMem += resource.scalar().value();
+ }
}
}