You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/07 04:58:06 UTC
[5/5] git commit: Fixed master to properly reconcile KillTask
messages.
Fixed master to properly reconcile KillTask messages.
Review: https://reviews.apache.org/r/13339
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a47b58e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a47b58e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a47b58e7
Branch: refs/heads/master
Commit: a47b58e72f0ce4eac459ca7960316c1f0b8ad12f
Parents: d88f1e3
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Aug 6 12:56:01 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 6 19:57:27 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 53 +++++++++++++++-----
src/master/master.hpp | 13 +++--
src/tests/slave_recovery_tests.cpp | 86 +++++++++++++++++++++++++++++++++
3 files changed, 135 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c5f8a93..a2f8929 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -905,17 +905,24 @@ void Master::killTask(const FrameworkID& frameworkId,
Slave* slave = getSlave(task->slave_id());
CHECK(slave != NULL);
- // TODO(vinod): Reconcile KillTaskMessages that are not received
- // by the slave (e.g., disconnected, partitioned).
- LOG(INFO) << "Telling slave " << slave->id << " ("
- << slave->info.hostname() << ")"
- << " to kill task " << taskId
- << " of framework " << frameworkId;
+ // We add the task to 'killedTasks' here because the slave
+ // might be partitioned or disconnected but the master
+ // doesn't know it yet.
+ slave->killedTasks.put(frameworkId, taskId);
- KillTaskMessage message;
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_task_id()->MergeFrom(taskId);
- send(slave->pid, message);
+ // NOTE: This task will be properly reconciled when the
+ // disconnected slave re-registers with the master.
+ if (!slave->disconnected) {
+ LOG(INFO) << "Telling slave " << slave->id << " ("
+ << slave->info.hostname() << ")"
+ << " to kill task " << taskId
+ << " of framework " << frameworkId;
+
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, message);
+ }
} else {
// TODO(benh): Once the scheduler has persistance and
// high-availability of it's tasks, it will be the one that
@@ -1093,9 +1100,6 @@ void Master::reregisterSlave(const SlaveID& slaveId,
allocator->slaveAdded(slaveId, slaveInfo, resources);
}
- // Reconcile tasks between master and the slave.
- reconcileTasks(slave, tasks);
-
SlaveReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
reply(message);
@@ -1103,6 +1107,11 @@ void Master::reregisterSlave(const SlaveID& slaveId,
// Update the slave pid and relink to it.
slave->pid = from;
link(slave->pid);
+
+ // Reconcile tasks between master and the slave.
+ // NOTE: This needs to be done after the registration message is
+ // sent to the slave and the new pid is linked.
+ reconcileTasks(slave, tasks);
} else {
// NOTE: This handles the case when the slave tries to
// re-register with a failed over master.
@@ -1800,6 +1809,24 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
removeTask(task);
}
}
+
+ // Send KillTaskMessages for tasks in 'killedTasks' that are
+ // still alive on the slave. This could happen if the slave
+ // did not receive KillTaskMessage because of a partition or
+ // disconnection.
+ foreach (const Task& task, tasks) {
+ if (!protobuf::isTerminalState(task.state()) &&
+ slave->killedTasks.contains(task.framework_id(), task.task_id())) {
+ LOG(WARNING) << " Slave " << slave->id << " (" << slave->info.hostname()
+ << ") has non-terminal task " << task.task_id()
+ << " that is supposed to be killed. Killing it now!";
+
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(task.framework_id());
+ message.mutable_task_id()->MergeFrom(task.task_id());
+ send(slave->pid, message);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8824822..fb99603 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -309,7 +309,7 @@ struct Slave
tasks[key] = task;
LOG(INFO) << "Adding task " << task->task_id()
<< " with resources " << task->resources()
- << " on slave " << id;
+ << " on slave " << id << " (" << info.hostname() << ")";
resourcesInUse += task->resources();
}
@@ -319,9 +319,10 @@ struct Slave
std::make_pair(task->framework_id(), task->task_id());
CHECK(tasks.count(key) > 0);
tasks.erase(key);
+ killedTasks.remove(task->framework_id(), task->task_id());
LOG(INFO) << "Removing task " << task->task_id()
<< " with resources " << task->resources()
- << " on slave " << id;
+ << " on slave " << id << " (" << info.hostname() << ")";
resourcesInUse -= task->resources();
}
@@ -331,7 +332,7 @@ struct Slave
offers.insert(offer);
VLOG(1) << "Adding offer " << offer->id()
<< " with resources " << offer->resources()
- << " on slave " << id;
+ << " on slave " << id << " (" << info.hostname() << ")";
resourcesOffered += offer->resources();
}
@@ -341,7 +342,7 @@ struct Slave
offers.erase(offer);
VLOG(1) << "Removing offer " << offer->id()
<< " with resources " << offer->resources()
- << " on slave " << id;
+ << " on slave " << id << " (" << info.hostname() << ")";
resourcesOffered -= offer->resources();
}
@@ -400,6 +401,10 @@ struct Slave
// We should find a way to eliminate this.
hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks;
+ // Tasks that were asked to kill by frameworks.
+ // This is used for reconciliation when the slave re-registers.
+ multihashmap<FrameworkID, TaskID> killedTasks;
+
// Active offers on this slave.
hashset<Offer*> offers;
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6172cba..bd755f6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1498,3 +1498,89 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
this->Shutdown(); // Shutdown before isolator(s) get deallocated.
}
+
+
+// This test verifies that a KillTask message received by the
+// master when a checkpointing slave is disconnected is properly
+// reconciled when the slave reregisters.
+TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ Future<RegisterSlaveMessage> registerSlaveMessage =
+ FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+ TypeParam isolator1;
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(registerSlaveMessage);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task
+
+ // Capture the slave and framework ids.
+ SlaveID slaveId = offers.get()[0].slave_id();
+ FrameworkID frameworkId = offers.get()[0].framework_id();
+
+ EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait for TASK_RUNNING update to be acknowledged.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+
+ this->Stop(slave.get());
+
+ // Now send a KillTask message to the master. This will not be
+ // received by the slave because it is down.
+ driver.killTask(task.task_id());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Now restart the slave (use same flags) with a new isolator.
+ TypeParam isolator2;
+
+ slave = this->StartSlave(&isolator2, flags);
+ ASSERT_SOME(slave);
+
+ // Scheduler should get a TASK_KILLED message.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_KILLED, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}