You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:32 UTC
[09/18] git commit: Fixed master to properly handle TASK_LOST updates
generated by it.
Fixed master to properly handle TASK_LOST updates generated by it.
Review: https://reviews.apache.org/r/13446
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd3584a7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd3584a7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd3584a7
Branch: refs/heads/master
Commit: fd3584a71f68015fcfff70cc889b32fac28f941b
Parents: ba2ee7c
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 11:08:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 121 ++++++++++++++-----------------
src/messages/messages.proto | 2 +
src/tests/fault_tolerance_tests.cpp | 22 +++---
3 files changed, 68 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0675b52..6530008 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -875,6 +875,9 @@ void Master::launchTasks(const FrameworkID& frameworkId,
status->set_message("Task launched with invalid offer");
update->set_timestamp(Clock::now().secs());
update->set_uuid(UUID::random().toBytes());
+
+ LOG(INFO) << "Sending status update " << *update
+ << " for launch task attempt on invalid offer " << offerId;
send(framework->pid, message);
}
}
@@ -1163,10 +1166,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// NOTE: We cannot use 'from' here to identify the slave as this is
// now sent by the StatusUpdateManagerProcess. Only 'pid' can
// be used to identify the slave.
- LOG(INFO) << "Status update from " << pid
- << ": task " << status.task_id()
- << " of framework " << update.framework_id()
- << " is now in state " << status.state();
+ LOG(INFO) << "Status update " << update << " from " << pid;
Slave* slave = getSlave(update.slave_id());
if (slave == NULL) {
@@ -1174,12 +1174,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// If the slave is deactivated, we have already informed
// frameworks that its tasks were LOST, so the slave should
// shut down.
- LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from deactivated slave " << pid
<< " with id " << update.slave_id() << " ; asking slave "
<< " to shutdown";
send(pid, ShutdownMessage());
} else {
- LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from unknown slave " << pid
<< " with id " << update.slave_id();
}
stats.invalidStatusUpdates++;
@@ -1190,7 +1192,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
- LOG(WARNING) << "Ignoring status update from " << pid << " ("
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from " << pid << " ("
<< slave->info.hostname() << "): error, couldn't lookup "
<< "framework " << update.framework_id();
stats.invalidStatusUpdates++;
@@ -1206,7 +1209,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// Lookup the task and see if we need to update anything locally.
Task* task = slave->getTask(update.framework_id(), status.task_id());
if (task == NULL) {
- LOG(WARNING) << "Status update from " << pid << " ("
+ LOG(WARNING) << "Status update " << update
+ << " from " << pid << " ("
<< slave->info.hostname() << "): error, couldn't lookup "
<< "task " << status.task_id();
stats.invalidStatusUpdates++;
@@ -1225,10 +1229,11 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
}
-void Master::exitedExecutor(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- int32_t status)
+void Master::exitedExecutor(
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int32_t status)
{
// Only update master's internal data structures here for properly accounting.
// The TASK_LOST updates are handled by the slave.
@@ -1671,6 +1676,7 @@ void Master::processTasks(Offer* offer,
TASK_LOST,
error.get());
+ LOG(INFO) << "Sending status update " << update << " for invalid task";
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
@@ -1786,20 +1792,14 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
<< " of framework " << task->framework_id()
<< " unknown to the slave " << slave->id;
- Framework* framework = getFramework(task->framework_id());
- if (framework != NULL) {
- const StatusUpdate& update = protobuf::createStatusUpdate(
- task->framework_id(),
- slave->id,
- task->task_id(),
- TASK_LOST,
- "Task was not received by the slave");
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ slave->id,
+ task->task_id(),
+ TASK_LOST,
+ "Task is unknown to the slave");
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(update);
- send(framework->pid, message);
- }
- removeTask(task);
+ statusUpdate(update, UPID());
}
}
@@ -1893,6 +1893,10 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
void Master::removeFramework(Framework* framework)
{
+ CHECK_NOTNULL(framework);
+
+ LOG(INFO) << "Removing framework " << framework->id;
+
if (framework->active) {
// Tell the allocator to stop allocating resources to this framework.
allocator->frameworkDeactivated(framework->id);
@@ -1961,6 +1965,10 @@ void Master::removeFramework(Slave* slave, Framework* framework)
CHECK_NOTNULL(slave);
CHECK_NOTNULL(framework);
+ LOG(INFO) << "Removing framework " << framework->id
+ << " from slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
+
// Remove pointers to framework's tasks in slaves, and send status updates.
foreachvalue (Task* task, utils::copy(slave->tasks)) {
// Remove tasks that belong to this framework.
@@ -1968,25 +1976,16 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected yet. For more info
// please see the comments in 'removeFramework(Framework*)'.
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(
- protobuf::createStatusUpdate(
- task->framework_id(),
- task->slave_id(),
- task->task_id(),
- TASK_LOST,
- "Slave " + slave->info.hostname() + " disconnected",
- (task->has_executor_id() ?
- Option<ExecutorID>(task->executor_id()) : None())));
-
- LOG(INFO) << "Sending status update " << message.update()
- << " due to disconnected slave " << slave->id
- << " (" << slave->info.hostname() << ")";
-
- send(framework->pid, message);
-
- // Remove the task from slave and framework.
- removeTask(task);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ task->slave_id(),
+ task->task_id(),
+ TASK_LOST,
+ "Slave " + slave->info.hostname() + " disconnected",
+ (task->has_executor_id()
+ ? Option<ExecutorID>(task->executor_id()) : None()));
+
+ statusUpdate(update, UPID());
}
}
@@ -2122,6 +2121,9 @@ void Master::removeSlave(Slave* slave)
{
CHECK_NOTNULL(slave);
+ LOG(INFO) << "Removing slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
+
// We do this first, to make sure any of the resources recovered
// below (e.g., removeTask()) are ignored by the allocator.
if (!slave->disconnected) {
@@ -2130,8 +2132,6 @@ void Master::removeSlave(Slave* slave)
// Remove pointers to slave's tasks in frameworks, and send status updates
foreachvalue (Task* task, utils::copy(slave->tasks)) {
- Framework* framework = getFramework(task->framework_id());
-
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected. This can be a tricky
// situation for frameworks that want to have high-availability,
@@ -2140,25 +2140,16 @@ void Master::removeSlave(Slave* slave)
// want to do is create a local Framework object to represent that
// framework until it fails over. See the TODO above in
// Master::reregisterSlave.
- if (framework != NULL) {
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(
- protobuf::createStatusUpdate(
- task->framework_id(),
- task->slave_id(),
- task->task_id(),
- TASK_LOST,
- "Slave " + slave->info.hostname() + " removed",
- (task->has_executor_id() ?
- Option<ExecutorID>(task->executor_id()) : None())));
-
- LOG(INFO) << "Sending status update " << message.update()
- << " due to the removal of slave "
- << slave->id << " (" << slave->info.hostname() << ")";
-
- send(framework->pid, message);
- }
- removeTask(task);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ task->slave_id(),
+ task->task_id(),
+ TASK_LOST,
+ "Slave " + slave->info.hostname() + " removed",
+ (task->has_executor_id() ?
+ Option<ExecutorID>(task->executor_id()) : None()));
+
+ statusUpdate(update, UPID());
}
foreach (Offer* offer, utils::copy(slave->offers)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19d4b38..4d400c2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -185,6 +185,8 @@ message KillTaskMessage {
}
+// NOTE: If 'pid' is present, scheduler driver sends an
+// acknowledgement to the pid.
message StatusUpdateMessage {
required StatusUpdate update = 1;
optional string pid = 2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 7bfe3b1..d74463f 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -431,18 +431,16 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
// At this point, the slave still thinks it's registered, so we
// simulate a status update coming from the slave.
- StatusUpdateMessage statusUpdate;
- statusUpdate.set_pid(stringify(slave.get()));
- statusUpdate.mutable_update()->mutable_framework_id()->set_value(
- frameworkId.get().value());
- statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
- statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
- statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
- "task_id");
- statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
- statusUpdate.mutable_update()->set_timestamp(Clock::now().secs());
- statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
- process::post(master.get(), statusUpdate);
+ TaskID taskId;
+ taskId.set_value("task_id");
+ const StatusUpdate& update = createStatusUpdate(
+ frameworkId.get(), slaveId, taskId, TASK_RUNNING);
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ message.set_pid(stringify(slave.get()));
+
+ process::post(master.get(), message);
// The master should shutdown the slave upon receiving the update.
AWAIT_READY(shutdownMessage);