You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/07/01 21:13:22 UTC
git commit: Fixed a bug in master to properly remove a non
checkpointing framework from a checkpointing slave.
Updated Branches:
refs/heads/master ddbb090e7 -> 777b745d1
Fixed a bug in master to properly remove a non checkpointing
framework from a checkpointing slave.
Review: https://reviews.apache.org/r/12182
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/777b745d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/777b745d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/777b745d
Branch: refs/heads/master
Commit: 777b745d1e6745997aa6aa086a6126f9aec0be43
Parents: ddbb090
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 28 14:40:49 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Jul 1 12:13:10 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 56 ++++++++++++++++++---------------
src/tests/slave_recovery_tests.cpp | 42 ++++++++++++++++++-------
2 files changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/777b745d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4da8773..695fb93 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -470,19 +470,25 @@ void Master::exited(const UPID& pid)
} else {
// If a slave is checkpointing, remove all non-checkpointing
// frameworks from the slave.
- hashset<FrameworkID> ids;
- foreachvalue (Task* task, utils::copy(slave->tasks)) {
- if (!ids.contains(task->framework_id())) {
- ids.insert(task->framework_id());
- Framework* framework = getFramework(task->framework_id());
- if (framework != NULL && !framework->info.checkpoint()) {
- LOG(INFO) << "Removing non-checkpointing framework "
- << task->framework_id()
- << " from disconnected slave " << slave->id
- << "(" << slave->info.hostname() << ")";
-
- removeFramework(slave, framework);
- }
+
+ // First, collect all the frameworks running on this slave.
+ hashset<FrameworkID> frameworkIds;
+ foreachvalue (Task* task, slave->tasks) {
+ frameworkIds.insert(task->framework_id());
+ }
+ foreachkey (const FrameworkID& frameworkId, slave->executors) {
+ frameworkIds.insert(frameworkId);
+ }
+
+ // Now, remove all the non-checkpointing frameworks.
+ foreach (const FrameworkID& frameworkId, frameworkIds) {
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL && !framework->info.checkpoint()) {
+ LOG(INFO) << "Removing non-checkpointing framework " << frameworkId
+ << " from disconnected slave " << slave->id
+ << "(" << slave->info.hostname() << ")";
+
+ removeFramework(slave, framework);
}
}
}
@@ -1792,20 +1798,20 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// over and the framework hasn't reconnected yet. For more info
// please see the comments in 'removeFramework(Framework*)'.
StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(task->framework_id());
+ message.mutable_update()->CopyFrom(
+ protobuf::createStatusUpdate(
+ task->framework_id(),
+ task->slave_id(),
+ task->task_id(),
+ TASK_LOST,
+ "Slave " + slave->info.hostname() + " disconnected",
+ (task->has_executor_id() ?
+ Option<ExecutorID>(task->executor_id()) : None())));
- if (task->has_executor_id()) {
- update->mutable_executor_id()->MergeFrom(task->executor_id());
- }
+ LOG(INFO) << "Sending status update " << message.update()
+ << " due to disconnected slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
- update->mutable_slave_id()->MergeFrom(task->slave_id());
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(task->task_id());
- status->set_state(TASK_LOST);
- status->set_message("Slave " + slave->info.hostname() + " disconnected");
- update->set_timestamp(Clock::now().secs());
- update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
// Remove the task from slave and framework.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/777b745d/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0195e68..14df749 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -798,28 +798,48 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
- TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ // Launch 2 tasks from this offer.
vector<TaskInfo> tasks;
- tasks.push_back(task); // Long-running task
+ Offer offer = offers.get()[0];
- Future<Nothing> update;
+ Offer offer1 = offer;
+ offer1.mutable_resources()->CopyFrom(Resources::parse("cpus:1;mem:512"));
+ tasks.push_back(createTask(offer1, "sleep 1000")); // Long-running task
+
+ Offer offer2 = offer;
+ offer2.mutable_resources()->CopyFrom(Resources::parse("cpus:1;mem:512"));
+ tasks.push_back(createTask(offer2, "sleep 1000")); // Long-running task
+
+ ASSERT_LE(Resources(offer1.resources()) + Resources(offer2.resources()),
+ Resources(offer.resources()));
+
+ Future<Nothing> update1;
+ Future<Nothing> update2;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureSatisfy(&update));
+ .WillOnce(FutureSatisfy(&update1))
+ .WillOnce(FutureSatisfy(&update2));
driver.launchTasks(offers.get()[0].id(), tasks);
- // Wait for TASK_RUNNING update.
- AWAIT_READY(update);
+ // Wait for TASK_RUNNING updates from the tasks.
+ AWAIT_READY(update1);
+ AWAIT_READY(update2);
- Future<TaskStatus> status;
+ // The master should generate TASK_LOST updates once the slave is stopped.
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2));
this->Stop(slave.get());
- // Scheduler should receive the TASK_LOST update.
- AWAIT_READY(status);
- ASSERT_EQ(TASK_LOST, status.get().state());
+ // Scheduler should receive the TASK_LOST updates.
+ AWAIT_READY(status1);
+ ASSERT_EQ(TASK_LOST, status1.get().state());
+
+ AWAIT_READY(status2);
+ ASSERT_EQ(TASK_LOST, status2.get().state());
driver.stop();
driver.join();