You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/26 00:43:28 UTC
git commit: Fixed master to ignore messages from unregistered
framework.
Updated Branches:
refs/heads/vinod/master_ignore_framework [created] 4c11e98c4
Fixed master to ignore messages from unregistered framework.
Review: https://reviews.apache.org/r/15773
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c11e98c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c11e98c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c11e98c
Branch: refs/heads/vinod/master_ignore_framework
Commit: 4c11e98c4d0385ecb1369c93a0f58595eee54857
Parents: f051c45
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Nov 21 17:42:00 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Nov 25 15:36:40 2013 -0800
----------------------------------------------------------------------
src/master/master.cpp | 391 +++++++++++++++++++------------
src/master/master.hpp | 5 +
src/tests/fault_tolerance_tests.cpp | 121 ++++++++++
3 files changed, 373 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a08d012..f13d2a0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -771,7 +771,8 @@ void Master::registerFramework(
}
if (!elected()) {
- LOG(WARNING) << "Ignoring register framework message since not elected yet";
+ LOG(WARNING) << "Ignoring register framework message from " << from
+ << " since not elected yet";
return;
}
@@ -850,8 +851,8 @@ void Master::reregisterFramework(
}
if (!elected()) {
- LOG(WARNING) << "Ignoring re-register framework message since "
- << "not elected yet";
+ LOG(WARNING) << "Ignoring re-register framework message from " << from
+ << " since not elected yet";
return;
}
@@ -911,6 +912,15 @@ void Master::reregisterFramework(
// us a different framework name, user name or executor info?
LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over";
failoverFramework(framework, from);
+ } else if (from != framework->pid) {
+ LOG(ERROR)
+ << "Framework " << frameworkInfo.id() << " at " << from
+ << " attempted to re-register while a framework at " << framework->pid
+ << " is already registered";
+ FrameworkErrorMessage message;
+ message.set_message("Framework failed over");
+ send(from, message);
+ return;
} else {
LOG(INFO) << "Allowing the Framework " << frameworkInfo.id()
<< " to re-register with an already used id";
@@ -920,9 +930,8 @@ void Master::reregisterFramework(
// replied to the offers but the driver might have dropped
// those messages since it wasn't connected to the master.
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->resourcesRecovered(offer->framework_id(),
- offer->slave_id(),
- offer->resources());
+ allocator->resourcesRecovered(
+ offer->framework_id(), offer->slave_id(), offer->resources());
removeOffer(offer);
}
@@ -1000,8 +1009,10 @@ void Master::unregisterFramework(
if (framework->pid == from) {
removeFramework(framework);
} else {
- LOG(WARNING) << from << " tried to unregister framework; "
- << "expecting " << framework->pid;
+ LOG(WARNING)
+ << "Ignoring unregister framework message for framework " << frameworkId
+ << " from " << from << " because it is not from the registered"
+ << " framework " << framework->pid;
}
}
}
@@ -1013,15 +1024,22 @@ void Master::deactivateFramework(
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- if (framework->pid == from) {
- LOG(INFO) << from << " asked to deactivate framework " << frameworkId;
- deactivate(framework);
- } else {
- LOG(WARNING) << from << " tried to deactivate framework; "
- << "expecting " << framework->pid;
- }
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring deactivate framework message for framework " << frameworkId
+ << " because the framework cannot be found";
+ return;
}
+
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring deactivate framework message for framework " << frameworkId
+ << " from '" << from << "' because it is not from the registered"
+ << " framework '" << framework->pid << "'";
+ return;
+ }
+
+ deactivate(framework);
}
@@ -1053,177 +1071,257 @@ void Master::deactivate(Framework* framework)
}
-void Master::resourceRequest(const FrameworkID& frameworkId,
- const vector<Request>& requests)
+void Master::resourceRequest(
+ const UPID& from,
+ const FrameworkID& frameworkId,
+ const vector<Request>& requests)
{
+ Framework* framework = getFramework(frameworkId);
+
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring resource request message from framework " << frameworkId
+ << " because the framework cannot be found";
+ return;
+ }
+
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring resource request message from framework " << frameworkId
+ << " from '" << from << "' because it is not from the registered "
+ << " framework '" << framework->pid << "'";
+ return;
+ }
+
+ LOG(INFO) << "Requesting resources for framework " << frameworkId;
allocator->resourcesRequested(frameworkId, requests);
}
-void Master::launchTasks(const FrameworkID& frameworkId,
- const OfferID& offerId,
- const vector<TaskInfo>& tasks,
- const Filters& filters)
+void Master::launchTasks(
+ const UPID& from,
+ const FrameworkID& frameworkId,
+ const OfferID& offerId,
+ const vector<TaskInfo>& tasks,
+ const Filters& filters)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- // TODO(benh): Support offer "hoarding" and allow multiple offers
- // *from the same slave* to be used to launch tasks. This can be
- // accomplished rather easily by collecting and merging all offers
- // into a mega-offer and passing that offer to
- // Master::processTasks.
- Offer* offer = getOffer(offerId);
- if (offer != NULL) {
- CHECK_EQ(offer->framework_id(), frameworkId)
- << "Offer " << offerId
- << " has invalid frameworkId " << offer->framework_id();
-
- Slave* slave = getSlave(offer->slave_id());
- CHECK(slave != NULL)
- << "Offer " << offerId << " outlived slave "
- << slave->id << " (" << slave->info.hostname() << ")";
- // If a slave is disconnected we should've removed its offers.
- CHECK(!slave->disconnected)
- << "Offer " << offerId << " outlived disconnected slave "
- << slave->id << " (" << slave->info.hostname() << ")";
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring launch tasks message for offer " << offerId
+ << " of framework " << frameworkId
+ << " because the framework cannot be found";
+ return;
+ }
- processTasks(offer, framework, slave, tasks, filters);
- } else {
- // The offer is gone (possibly rescinded, lost slave, re-reply
- // to same offer, etc). Report all tasks in it as failed.
- // TODO: Consider adding a new task state TASK_INVALID for
- // situations like these.
- LOG(WARNING) << "Offer " << offerId << " is no longer valid";
- foreach (const TaskInfo& task, tasks) {
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_LOST);
- status->set_message("Task launched with invalid offer");
- update->set_timestamp(Clock::now().secs());
- update->set_uuid(UUID::random().toBytes());
-
- LOG(INFO) << "Sending status update " << *update
- << " for launch task attempt on invalid offer " << offerId;
- send(framework->pid, message);
- }
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring launch tasks message for offer " << offerId
+ << " of framework " << frameworkId << " from '" << from
+ << "' because it is not from the registered framework '"
+ << framework->pid << "'";
+ return;
+ }
+
+ // TODO(benh): Support offer "hoarding" and allow multiple offers
+ // *from the same slave* to be used to launch tasks. This can be
+ // accomplished rather easily by collecting and merging all offers
+ // into a mega-offer and passing that offer to
+ // Master::processTasks.
+ Offer* offer = getOffer(offerId);
+ if (offer != NULL) {
+ CHECK_EQ(offer->framework_id(), frameworkId)
+ << "Offer " << offerId
+ << " has invalid frameworkId " << offer->framework_id();
+
+ Slave* slave = getSlave(offer->slave_id());
+ CHECK(slave != NULL)
+ << "Offer " << offerId << " outlived slave "
+ << slave->id << " (" << slave->info.hostname() << ")";
+
+ // If a slave is disconnected we should've removed its offers.
+ CHECK(!slave->disconnected)
+ << "Offer " << offerId << " outlived disconnected slave "
+ << slave->id << " (" << slave->info.hostname() << ")";
+
+ processTasks(offer, framework, slave, tasks, filters);
+ } else {
+ // The offer is gone (possibly rescinded, lost slave, re-reply
+ // to same offer, etc). Report all tasks in it as failed.
+ // TODO: Consider adding a new task state TASK_INVALID for
+ // situations like these.
+ LOG(WARNING) << "Offer " << offerId << " is no longer valid";
+ foreach (const TaskInfo& task, tasks) {
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(task.task_id());
+ status->set_state(TASK_LOST);
+ status->set_message("Task launched with invalid offer");
+ update->set_timestamp(Clock::now().secs());
+ update->set_uuid(UUID::random().toBytes());
+
+ LOG(INFO) << "Sending status update " << *update
+ << " for launch task attempt on invalid offer " << offerId;
+ send(framework->pid, message);
}
}
}
-void Master::reviveOffers(const FrameworkID& frameworkId)
+void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Reviving offers for framework " << framework->id;
- allocator->offersRevived(framework->id);
+
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring revive offers message for framework " << frameworkId
+ << " because the framework cannot be found";
+ return;
+ }
+
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring revive offers message for framework " << frameworkId
+ << " from '" << from << "' because it is not from the registered"
+ << " framework '" << framework->pid << "'";
+ return;
}
+
+ LOG(INFO) << "Reviving offers for framework " << framework->id;
+ allocator->offersRevived(framework->id);
}
-void Master::killTask(const FrameworkID& frameworkId,
- const TaskID& taskId)
+void Master::killTask(
+ const UPID& from,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId)
{
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Task* task = framework->getTask(taskId);
- if (task != NULL) {
- Slave* slave = getSlave(task->slave_id());
- CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
-
- // We add the task to 'killedTasks' here because the slave
- // might be partitioned or disconnected but the master
- // doesn't know it yet.
- slave->killedTasks.put(frameworkId, taskId);
-
- // NOTE: This task will be properly reconciled when the
- // disconnected slave re-registers with the master.
- if (!slave->disconnected) {
- LOG(INFO) << "Telling slave " << slave->id << " ("
- << slave->info.hostname() << ")"
- << " to kill task " << taskId
- << " of framework " << frameworkId;
-
- KillTaskMessage message;
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_task_id()->MergeFrom(taskId);
- send(slave->pid, message);
- }
- } else {
- // TODO(benh): Once the scheduler has persistance and
- // high-availability of it's tasks, it will be the one that
- // determines that this invocation of 'killTask' is silly, and
- // can just return "locally" (i.e., after hitting only the other
- // replicas). Unfortunately, it still won't know the slave id.
-
- LOG(WARNING) << "Cannot kill task " << taskId
- << " of framework " << frameworkId
- << " because it cannot be found";
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->set_state(TASK_LOST);
- status->set_message("Task not found");
- update->set_timestamp(Clock::now().secs());
- update->set_uuid(UUID::random().toBytes());
- send(framework->pid, message);
+
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring kill task message for task " << taskId << " of framework "
+ << frameworkId << " because the framework cannot be found";
+ return;
+ }
+
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring kill task message for task " << taskId
+ << " of framework " << frameworkId << " from '" << from
+ << "' because it is not from the registered framework '"
+ << framework->pid << "'";
+ return;
+ }
+
+ Task* task = framework->getTask(taskId);
+ if (task != NULL) {
+ Slave* slave = getSlave(task->slave_id());
+ CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
+
+ // We add the task to 'killedTasks' here because the slave
+ // might be partitioned or disconnected but the master
+ // doesn't know it yet.
+ slave->killedTasks.put(frameworkId, taskId);
+
+ // NOTE: This task will be properly reconciled when the
+ // disconnected slave re-registers with the master.
+ if (!slave->disconnected) {
+ LOG(INFO) << "Telling slave " << slave->id << " ("
+ << slave->info.hostname() << ")"
+ << " to kill task " << taskId
+ << " of framework " << frameworkId;
+
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, message);
}
} else {
- LOG(WARNING) << "Failed to kill task " << taskId
+ // TODO(benh): Once the scheduler has persistance and
+ // high-availability of it's tasks, it will be the one that
+ // determines that this invocation of 'killTask' is silly, and
+ // can just return "locally" (i.e., after hitting only the other
+ // replicas). Unfortunately, it still won't know the slave id.
+
+ LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
- << " because the framework cannot be found";
+ << " because the task cannot be found";
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(TASK_LOST);
+ status->set_message("Task not found");
+ update->set_timestamp(Clock::now().secs());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
}
}
-void Master::schedulerMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const string& data)
+void Master::schedulerMessage(
+ const UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
{
Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Slave* slave = getSlave(slaveId);
- if (slave != NULL) {
- if (!slave->disconnected) {
- LOG(INFO) << "Sending framework message for framework "
- << frameworkId << " to slave " << slaveId
- << " (" << slave->info.hostname() << ")";
-
- FrameworkToExecutorMessage message;
- message.mutable_slave_id()->MergeFrom(slaveId);
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_data(data);
- send(slave->pid, message);
- stats.validFrameworkMessages++;
- } else {
- LOG(WARNING) << "Cannot send framework message for framework "
- << frameworkId << " to slave " << slaveId
- << " (" << slave->info.hostname() << ")"
- << " because slave is disconnected";
- stats.invalidFrameworkMessages++;
- }
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring framework message for executor " << executorId
+ << " of framework " << frameworkId
+ << " because the framework cannot be found";
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring framework message for executor " << executorId
+ << " of framework " << frameworkId << " from " << from
+ << " because it is not from the registered framework "
+ << framework->pid;
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
+ Slave* slave = getSlave(slaveId);
+ if (slave != NULL) {
+ if (!slave->disconnected) {
+ LOG(INFO) << "Sending framework message for framework "
+ << frameworkId << " to slave " << slaveId
+ << " (" << slave->info.hostname() << ")";
+
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(slave->pid, message);
+
+ stats.validFrameworkMessages++;
} else {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
- << " because slave does not exist";
+ << " (" << slave->info.hostname() << ")"
+ << " because slave is disconnected";
stats.invalidFrameworkMessages++;
}
} else {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
- << " because framework does not exist";
+ << " because slave does not exist";
stats.invalidFrameworkMessages++;
}
}
@@ -1707,6 +1805,11 @@ void Master::offer(const FrameworkID& frameworkId,
}
+// TODO(vinod): If due to network partition there are two instances
+// of the framework that think they are leaders and try to
+// authenticate with master they would be stepping on each other's
+// toes. Currently it is tricky to detect this case because the
+// 'authenticate' message doesn't contain the 'FrameworkID'.
void Master::authenticate(const UPID& from, const UPID& pid)
{
// Deactivate the framework if it's already registered.
http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c86c1f1..624c133 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -110,19 +110,24 @@ public:
const process::UPID& from,
const FrameworkID& frameworkId);
void resourceRequest(
+ const process::UPID& from,
const FrameworkID& frameworkId,
const std::vector<Request>& requests);
void launchTasks(
+ const process::UPID& from,
const FrameworkID& frameworkId,
const OfferID& offerId,
const std::vector<TaskInfo>& tasks,
const Filters& filters);
void reviveOffers(
+ const process::UPID& from,
const FrameworkID& frameworkId);
void killTask(
+ const process::UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId);
void schedulerMessage(
+ const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 6cb5829..687c981 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1379,6 +1379,127 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
}
+// This test verifies that a partitioned framework that still
+// thinks it is registered with the master cannot kill a task because
+// the master has re-registered another instance of the framework.
+// What this test does:
+// 1. Launch a master, slave and scheduler.
+// 2. Scheduler launches a task.
+// 3. Launch a second failed over scheduler.
+// 4. Make the first scheduler believe it is still registered.
+// 5. First scheduler attempts to kill the task which is ignored by the master.
+TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ // Start the first scheduler and launch a task.
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ driver1.start();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Now start the second failed over scheduler.
+ MockScheduler sched2;
+
+ FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+ framework2 = DEFAULT_FRAMEWORK_INFO;
+ framework2.mutable_id()->MergeFrom(frameworkId);
+
+ MesosSchedulerDriver driver2(
+ &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+ .WillRepeatedly(Return()); // Ignore any offers.
+
+ // Drop the framework error message from the master to simulate
+ // a partitioned framework.
+ Future<FrameworkErrorMessage> frameworkErrorMessage =
+ DROP_PROTOBUF(FrameworkErrorMessage(), _ , _);
+
+ driver2.start();
+
+ AWAIT_READY(frameworkErrorMessage);
+
+ AWAIT_READY(registered);
+
+ // Now both the frameworks think they are registered with the
+ // master, but the master only knows about the second framework.
+
+ // A 'killTask' by first framework should be dropped by the master.
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .Times(0);
+
+ // 'TASK_FINSIHED' by the executor should reach the second framework.
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ Future<KillTaskMessage> killTaskMessage =
+ FUTURE_PROTOBUF(KillTaskMessage(), _, _);
+
+ driver1.killTask(status.get().task_id());
+
+ AWAIT_READY(killTaskMessage);
+
+ // By this point the master must have processed and ignored the
+ // 'killTask' message from the first framework. To verify this,
+ // the executor sends 'TASK_FINISHED' to ensure the only update
+ // received by the scheduler is 'TASK_FINISHED' and not
+ // 'TASK_KILLED'.
+ TaskStatus finishedStatus;
+ finishedStatus = status.get();
+ finishedStatus.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(finishedStatus);
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver1.stop();
+ driver2.stop();
+
+ driver1.join();
+ driver2.join();
+
+ Shutdown();
+}
+
+
// This test checks that a scheduler exit shuts down the executor.
TEST_F(FaultToleranceTest, SchedulerExit)
{
Re: git commit: Fixed master to ignore messages from unregistered framework.
Posted by Vinod Kone <vi...@gmail.com>.
oops sorry. looks like i created a branch instead of committing the patch
to head. will fix shortly.
On Mon, Nov 25, 2013 at 3:43 PM, <vi...@apache.org> wrote:
> Updated Branches:
> refs/heads/vinod/master_ignore_framework [created] 4c11e98c4
>
>
> Fixed master to ignore messages from unregistered framework.
>
> Review: https://reviews.apache.org/r/15773
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c11e98c
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c11e98c
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c11e98c
>
> Branch: refs/heads/vinod/master_ignore_framework
> Commit: 4c11e98c4d0385ecb1369c93a0f58595eee54857
> Parents: f051c45
> Author: Vinod Kone <vi...@twitter.com>
> Authored: Thu Nov 21 17:42:00 2013 -0800
> Committer: Vinod Kone <vi...@twitter.com>
> Committed: Mon Nov 25 15:36:40 2013 -0800
>
> ----------------------------------------------------------------------
> src/master/master.cpp | 391 +++++++++++++++++++------------
> src/master/master.hpp | 5 +
> src/tests/fault_tolerance_tests.cpp | 121 ++++++++++
> 3 files changed, 373 insertions(+), 144 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.cpp b/src/master/master.cpp
> index a08d012..f13d2a0 100644
> --- a/src/master/master.cpp
> +++ b/src/master/master.cpp
> @@ -771,7 +771,8 @@ void Master::registerFramework(
> }
>
> if (!elected()) {
> - LOG(WARNING) << "Ignoring register framework message since not
> elected yet";
> + LOG(WARNING) << "Ignoring register framework message from " << from
> + << " since not elected yet";
> return;
> }
>
> @@ -850,8 +851,8 @@ void Master::reregisterFramework(
> }
>
> if (!elected()) {
> - LOG(WARNING) << "Ignoring re-register framework message since "
> - << "not elected yet";
> + LOG(WARNING) << "Ignoring re-register framework message from " << from
> + << " since not elected yet";
> return;
> }
>
> @@ -911,6 +912,15 @@ void Master::reregisterFramework(
> // us a different framework name, user name or executor info?
> LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over";
> failoverFramework(framework, from);
> + } else if (from != framework->pid) {
> + LOG(ERROR)
> + << "Framework " << frameworkInfo.id() << " at " << from
> + << " attempted to re-register while a framework at " <<
> framework->pid
> + << " is already registered";
> + FrameworkErrorMessage message;
> + message.set_message("Framework failed over");
> + send(from, message);
> + return;
> } else {
> LOG(INFO) << "Allowing the Framework " << frameworkInfo.id()
> << " to re-register with an already used id";
> @@ -920,9 +930,8 @@ void Master::reregisterFramework(
> // replied to the offers but the driver might have dropped
> // those messages since it wasn't connected to the master.
> foreach (Offer* offer, utils::copy(framework->offers)) {
> - allocator->resourcesRecovered(offer->framework_id(),
> - offer->slave_id(),
> - offer->resources());
> + allocator->resourcesRecovered(
> + offer->framework_id(), offer->slave_id(), offer->resources());
> removeOffer(offer);
> }
>
> @@ -1000,8 +1009,10 @@ void Master::unregisterFramework(
> if (framework->pid == from) {
> removeFramework(framework);
> } else {
> - LOG(WARNING) << from << " tried to unregister framework; "
> - << "expecting " << framework->pid;
> + LOG(WARNING)
> + << "Ignoring unregister framework message for framework " <<
> frameworkId
> + << " from " << from << " because it is not from the registered"
> + << " framework " << framework->pid;
> }
> }
> }
> @@ -1013,15 +1024,22 @@ void Master::deactivateFramework(
> {
> Framework* framework = getFramework(frameworkId);
>
> - if (framework != NULL) {
> - if (framework->pid == from) {
> - LOG(INFO) << from << " asked to deactivate framework " <<
> frameworkId;
> - deactivate(framework);
> - } else {
> - LOG(WARNING) << from << " tried to deactivate framework; "
> - << "expecting " << framework->pid;
> - }
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring deactivate framework message for framework " <<
> frameworkId
> + << " because the framework cannot be found";
> + return;
> }
> +
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring deactivate framework message for framework " <<
> frameworkId
> + << " from '" << from << "' because it is not from the registered"
> + << " framework '" << framework->pid << "'";
> + return;
> + }
> +
> + deactivate(framework);
> }
>
>
> @@ -1053,177 +1071,257 @@ void Master::deactivate(Framework* framework)
> }
>
>
> -void Master::resourceRequest(const FrameworkID& frameworkId,
> - const vector<Request>& requests)
> +void Master::resourceRequest(
> + const UPID& from,
> + const FrameworkID& frameworkId,
> + const vector<Request>& requests)
> {
> + Framework* framework = getFramework(frameworkId);
> +
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring resource request message from framework " <<
> frameworkId
> + << " because the framework cannot be found";
> + return;
> + }
> +
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring resource request message from framework " <<
> frameworkId
> + << " from '" << from << "' because it is not from the registered "
> + << " framework '" << framework->pid << "'";
> + return;
> + }
> +
> + LOG(INFO) << "Requesting resources for framework " << frameworkId;
> allocator->resourcesRequested(frameworkId, requests);
> }
>
>
> -void Master::launchTasks(const FrameworkID& frameworkId,
> - const OfferID& offerId,
> - const vector<TaskInfo>& tasks,
> - const Filters& filters)
> +void Master::launchTasks(
> + const UPID& from,
> + const FrameworkID& frameworkId,
> + const OfferID& offerId,
> + const vector<TaskInfo>& tasks,
> + const Filters& filters)
> {
> Framework* framework = getFramework(frameworkId);
> - if (framework != NULL) {
> - // TODO(benh): Support offer "hoarding" and allow multiple offers
> - // *from the same slave* to be used to launch tasks. This can be
> - // accomplished rather easily by collecting and merging all offers
> - // into a mega-offer and passing that offer to
> - // Master::processTasks.
> - Offer* offer = getOffer(offerId);
> - if (offer != NULL) {
> - CHECK_EQ(offer->framework_id(), frameworkId)
> - << "Offer " << offerId
> - << " has invalid frameworkId " << offer->framework_id();
> -
> - Slave* slave = getSlave(offer->slave_id());
> - CHECK(slave != NULL)
> - << "Offer " << offerId << " outlived slave "
> - << slave->id << " (" << slave->info.hostname() << ")";
>
> - // If a slave is disconnected we should've removed its offers.
> - CHECK(!slave->disconnected)
> - << "Offer " << offerId << " outlived disconnected slave "
> - << slave->id << " (" << slave->info.hostname() << ")";
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring launch tasks message for offer " << offerId
> + << " of framework " << frameworkId
> + << " because the framework cannot be found";
> + return;
> + }
>
> - processTasks(offer, framework, slave, tasks, filters);
> - } else {
> - // The offer is gone (possibly rescinded, lost slave, re-reply
> - // to same offer, etc). Report all tasks in it as failed.
> - // TODO: Consider adding a new task state TASK_INVALID for
> - // situations like these.
> - LOG(WARNING) << "Offer " << offerId << " is no longer valid";
> - foreach (const TaskInfo& task, tasks) {
> - StatusUpdateMessage message;
> - StatusUpdate* update = message.mutable_update();
> - update->mutable_framework_id()->MergeFrom(frameworkId);
> - TaskStatus* status = update->mutable_status();
> - status->mutable_task_id()->MergeFrom(task.task_id());
> - status->set_state(TASK_LOST);
> - status->set_message("Task launched with invalid offer");
> - update->set_timestamp(Clock::now().secs());
> - update->set_uuid(UUID::random().toBytes());
> -
> - LOG(INFO) << "Sending status update " << *update
> - << " for launch task attempt on invalid offer " <<
> offerId;
> - send(framework->pid, message);
> - }
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring launch tasks message for offer " << offerId
> + << " of framework " << frameworkId << " from '" << from
> + << "' because it is not from the registered framework '"
> + << framework->pid << "'";
> + return;
> + }
> +
> + // TODO(benh): Support offer "hoarding" and allow multiple offers
> + // *from the same slave* to be used to launch tasks. This can be
> + // accomplished rather easily by collecting and merging all offers
> + // into a mega-offer and passing that offer to
> + // Master::processTasks.
> + Offer* offer = getOffer(offerId);
> + if (offer != NULL) {
> + CHECK_EQ(offer->framework_id(), frameworkId)
> + << "Offer " << offerId
> + << " has invalid frameworkId " << offer->framework_id();
> +
> + Slave* slave = getSlave(offer->slave_id());
> + CHECK(slave != NULL)
> + << "Offer " << offerId << " outlived slave "
> + << slave->id << " (" << slave->info.hostname() << ")";
> +
> + // If a slave is disconnected we should've removed its offers.
> + CHECK(!slave->disconnected)
> + << "Offer " << offerId << " outlived disconnected slave "
> + << slave->id << " (" << slave->info.hostname() << ")";
> +
> + processTasks(offer, framework, slave, tasks, filters);
> + } else {
> + // The offer is gone (possibly rescinded, lost slave, re-reply
> + // to same offer, etc). Report all tasks in it as failed.
> + // TODO: Consider adding a new task state TASK_INVALID for
> + // situations like these.
> + LOG(WARNING) << "Offer " << offerId << " is no longer valid";
> + foreach (const TaskInfo& task, tasks) {
> + StatusUpdateMessage message;
> + StatusUpdate* update = message.mutable_update();
> + update->mutable_framework_id()->MergeFrom(frameworkId);
> + TaskStatus* status = update->mutable_status();
> + status->mutable_task_id()->MergeFrom(task.task_id());
> + status->set_state(TASK_LOST);
> + status->set_message("Task launched with invalid offer");
> + update->set_timestamp(Clock::now().secs());
> + update->set_uuid(UUID::random().toBytes());
> +
> + LOG(INFO) << "Sending status update " << *update
> + << " for launch task attempt on invalid offer " <<
> offerId;
> + send(framework->pid, message);
> }
> }
> }
>
>
> -void Master::reviveOffers(const FrameworkID& frameworkId)
> +void Master::reviveOffers(const UPID& from, const FrameworkID&
> frameworkId)
> {
> Framework* framework = getFramework(frameworkId);
> - if (framework != NULL) {
> - LOG(INFO) << "Reviving offers for framework " << framework->id;
> - allocator->offersRevived(framework->id);
> +
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring revive offers message for framework " << frameworkId
> + << " because the framework cannot be found";
> + return;
> + }
> +
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring revive offers message for framework " << frameworkId
> + << " from '" << from << "' because it is not from the registered"
> + << " framework '" << framework->pid << "'";
> + return;
> }
> +
> + LOG(INFO) << "Reviving offers for framework " << framework->id;
> + allocator->offersRevived(framework->id);
> }
>
>
> -void Master::killTask(const FrameworkID& frameworkId,
> - const TaskID& taskId)
> +void Master::killTask(
> + const UPID& from,
> + const FrameworkID& frameworkId,
> + const TaskID& taskId)
> {
> LOG(INFO) << "Asked to kill task " << taskId
> << " of framework " << frameworkId;
>
> Framework* framework = getFramework(frameworkId);
> - if (framework != NULL) {
> - Task* task = framework->getTask(taskId);
> - if (task != NULL) {
> - Slave* slave = getSlave(task->slave_id());
> - CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
> -
> - // We add the task to 'killedTasks' here because the slave
> - // might be partitioned or disconnected but the master
> - // doesn't know it yet.
> - slave->killedTasks.put(frameworkId, taskId);
> -
> - // NOTE: This task will be properly reconciled when the
> - // disconnected slave re-registers with the master.
> - if (!slave->disconnected) {
> - LOG(INFO) << "Telling slave " << slave->id << " ("
> - << slave->info.hostname() << ")"
> - << " to kill task " << taskId
> - << " of framework " << frameworkId;
> -
> - KillTaskMessage message;
> - message.mutable_framework_id()->MergeFrom(frameworkId);
> - message.mutable_task_id()->MergeFrom(taskId);
> - send(slave->pid, message);
> - }
> - } else {
> - // TODO(benh): Once the scheduler has persistance and
> - // high-availability of it's tasks, it will be the one that
> - // determines that this invocation of 'killTask' is silly, and
> - // can just return "locally" (i.e., after hitting only the other
> - // replicas). Unfortunately, it still won't know the slave id.
> -
> - LOG(WARNING) << "Cannot kill task " << taskId
> - << " of framework " << frameworkId
> - << " because it cannot be found";
> - StatusUpdateMessage message;
> - StatusUpdate* update = message.mutable_update();
> - update->mutable_framework_id()->MergeFrom(frameworkId);
> - TaskStatus* status = update->mutable_status();
> - status->mutable_task_id()->MergeFrom(taskId);
> - status->set_state(TASK_LOST);
> - status->set_message("Task not found");
> - update->set_timestamp(Clock::now().secs());
> - update->set_uuid(UUID::random().toBytes());
> - send(framework->pid, message);
> +
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring kill task message for task " << taskId << " of
> framework "
> + << frameworkId << " because the framework cannot be found";
> + return;
> + }
> +
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring kill task message for task " << taskId
> + << " of framework " << frameworkId << " from '" << from
> + << "' because it is not from the registered framework '"
> + << framework->pid << "'";
> + return;
> + }
> +
> + Task* task = framework->getTask(taskId);
> + if (task != NULL) {
> + Slave* slave = getSlave(task->slave_id());
> + CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
> +
> + // We add the task to 'killedTasks' here because the slave
> + // might be partitioned or disconnected but the master
> + // doesn't know it yet.
> + slave->killedTasks.put(frameworkId, taskId);
> +
> + // NOTE: This task will be properly reconciled when the
> + // disconnected slave re-registers with the master.
> + if (!slave->disconnected) {
> + LOG(INFO) << "Telling slave " << slave->id << " ("
> + << slave->info.hostname() << ")"
> + << " to kill task " << taskId
> + << " of framework " << frameworkId;
> +
> + KillTaskMessage message;
> + message.mutable_framework_id()->MergeFrom(frameworkId);
> + message.mutable_task_id()->MergeFrom(taskId);
> + send(slave->pid, message);
> }
> } else {
> - LOG(WARNING) << "Failed to kill task " << taskId
> + // TODO(benh): Once the scheduler has persistance and
> + // high-availability of it's tasks, it will be the one that
> + // determines that this invocation of 'killTask' is silly, and
> + // can just return "locally" (i.e., after hitting only the other
> + // replicas). Unfortunately, it still won't know the slave id.
> +
> + LOG(WARNING) << "Cannot kill task " << taskId
> << " of framework " << frameworkId
> - << " because the framework cannot be found";
> + << " because the task cannot be found";
> + StatusUpdateMessage message;
> + StatusUpdate* update = message.mutable_update();
> + update->mutable_framework_id()->MergeFrom(frameworkId);
> + TaskStatus* status = update->mutable_status();
> + status->mutable_task_id()->MergeFrom(taskId);
> + status->set_state(TASK_LOST);
> + status->set_message("Task not found");
> + update->set_timestamp(Clock::now().secs());
> + update->set_uuid(UUID::random().toBytes());
> + send(framework->pid, message);
> }
> }
>
>
> -void Master::schedulerMessage(const SlaveID& slaveId,
> - const FrameworkID& frameworkId,
> - const ExecutorID& executorId,
> - const string& data)
> +void Master::schedulerMessage(
> + const UPID& from,
> + const SlaveID& slaveId,
> + const FrameworkID& frameworkId,
> + const ExecutorID& executorId,
> + const string& data)
> {
> Framework* framework = getFramework(frameworkId);
> - if (framework != NULL) {
> - Slave* slave = getSlave(slaveId);
> - if (slave != NULL) {
> - if (!slave->disconnected) {
> - LOG(INFO) << "Sending framework message for framework "
> - << frameworkId << " to slave " << slaveId
> - << " (" << slave->info.hostname() << ")";
> -
> - FrameworkToExecutorMessage message;
> - message.mutable_slave_id()->MergeFrom(slaveId);
> - message.mutable_framework_id()->MergeFrom(frameworkId);
> - message.mutable_executor_id()->MergeFrom(executorId);
> - message.set_data(data);
> - send(slave->pid, message);
>
> - stats.validFrameworkMessages++;
> - } else {
> - LOG(WARNING) << "Cannot send framework message for framework "
> - << frameworkId << " to slave " << slaveId
> - << " (" << slave->info.hostname() << ")"
> - << " because slave is disconnected";
> - stats.invalidFrameworkMessages++;
> - }
> + if (framework == NULL) {
> + LOG(WARNING)
> + << "Ignoring framework message for executor " << executorId
> + << " of framework " << frameworkId
> + << " because the framework cannot be found";
> + stats.invalidFrameworkMessages++;
> + return;
> + }
> +
> + if (from != framework->pid) {
> + LOG(WARNING)
> + << "Ignoring framework message for executor " << executorId
> + << " of framework " << frameworkId << " from " << from
> + << " because it is not from the registered framework "
> + << framework->pid;
> + stats.invalidFrameworkMessages++;
> + return;
> + }
> +
> + Slave* slave = getSlave(slaveId);
> + if (slave != NULL) {
> + if (!slave->disconnected) {
> + LOG(INFO) << "Sending framework message for framework "
> + << frameworkId << " to slave " << slaveId
> + << " (" << slave->info.hostname() << ")";
> +
> + FrameworkToExecutorMessage message;
> + message.mutable_slave_id()->MergeFrom(slaveId);
> + message.mutable_framework_id()->MergeFrom(frameworkId);
> + message.mutable_executor_id()->MergeFrom(executorId);
> + message.set_data(data);
> + send(slave->pid, message);
> +
> + stats.validFrameworkMessages++;
> } else {
> LOG(WARNING) << "Cannot send framework message for framework "
> << frameworkId << " to slave " << slaveId
> - << " because slave does not exist";
> + << " (" << slave->info.hostname() << ")"
> + << " because slave is disconnected";
> stats.invalidFrameworkMessages++;
> }
> } else {
> LOG(WARNING) << "Cannot send framework message for framework "
> << frameworkId << " to slave " << slaveId
> - << " because framework does not exist";
> + << " because slave does not exist";
> stats.invalidFrameworkMessages++;
> }
> }
> @@ -1707,6 +1805,11 @@ void Master::offer(const FrameworkID& frameworkId,
> }
>
>
> +// TODO(vinod): If due to network partition there are two instances
> +// of the framework that think they are leaders and try to
> +// authenticate with master they would be stepping on each other's
> +// toes. Currently it is tricky to detect this case because the
> +// 'authenticate' message doesn't contain the 'FrameworkID'.
> void Master::authenticate(const UPID& from, const UPID& pid)
> {
> // Deactivate the framework if it's already registered.
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.hpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.hpp b/src/master/master.hpp
> index c86c1f1..624c133 100644
> --- a/src/master/master.hpp
> +++ b/src/master/master.hpp
> @@ -110,19 +110,24 @@ public:
> const process::UPID& from,
> const FrameworkID& frameworkId);
> void resourceRequest(
> + const process::UPID& from,
> const FrameworkID& frameworkId,
> const std::vector<Request>& requests);
> void launchTasks(
> + const process::UPID& from,
> const FrameworkID& frameworkId,
> const OfferID& offerId,
> const std::vector<TaskInfo>& tasks,
> const Filters& filters);
> void reviveOffers(
> + const process::UPID& from,
> const FrameworkID& frameworkId);
> void killTask(
> + const process::UPID& from,
> const FrameworkID& frameworkId,
> const TaskID& taskId);
> void schedulerMessage(
> + const process::UPID& from,
> const SlaveID& slaveId,
> const FrameworkID& frameworkId,
> const ExecutorID& executorId,
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/tests/fault_tolerance_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/src/tests/fault_tolerance_tests.cpp
> b/src/tests/fault_tolerance_tests.cpp
> index 6cb5829..687c981 100644
> --- a/src/tests/fault_tolerance_tests.cpp
> +++ b/src/tests/fault_tolerance_tests.cpp
> @@ -1379,6 +1379,127 @@ TEST_F(FaultToleranceTest,
> SchedulerFailoverFrameworkMessage)
> }
>
>
> +// This test verifies that a partitioned framework that still
> +// thinks it is registered with the master cannot kill a task because
> +// the master has re-registered another instance of the framework.
> +// What this test does:
> +// 1. Launch a master, slave and scheduler.
> +// 2. Scheduler launches a task.
> +// 3. Launch a second failed over scheduler.
> +// 4. Make the first scheduler believe it is still registered.
> +// 5. First scheduler attempts to kill the task which is ignored by the
> master.
> +TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
> +{
> + Try<PID<Master> > master = StartMaster();
> + ASSERT_SOME(master);
> +
> + MockExecutor exec(DEFAULT_EXECUTOR_ID);
> +
> + Try<PID<Slave> > slave = StartSlave(&exec);
> + ASSERT_SOME(slave);
> +
> + // Start the first scheduler and launch a task.
> + MockScheduler sched1;
> + MesosSchedulerDriver driver1(
> + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
> +
> + FrameworkID frameworkId;
> + EXPECT_CALL(sched1, registered(&driver1, _, _))
> + .WillOnce(SaveArg<1>(&frameworkId));
> +
> + EXPECT_CALL(sched1, resourceOffers(&driver1, _))
> + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
> + .WillRepeatedly(Return()); // Ignore subsequent offers.
> +
> + Future<TaskStatus> status;
> + EXPECT_CALL(sched1, statusUpdate(&driver1, _))
> + .WillOnce(FutureArg<1>(&status));
> +
> + ExecutorDriver* execDriver;
> + EXPECT_CALL(exec, registered(_, _, _, _))
> + .WillOnce(SaveArg<0>(&execDriver));
> +
> + EXPECT_CALL(exec, launchTask(_, _))
> + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
> +
> + driver1.start();
> +
> + AWAIT_READY(status);
> + EXPECT_EQ(TASK_RUNNING, status.get().state());
> +
> + // Now start the second failed over scheduler.
> + MockScheduler sched2;
> +
> + FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
> + framework2 = DEFAULT_FRAMEWORK_INFO;
> + framework2.mutable_id()->MergeFrom(frameworkId);
> +
> + MesosSchedulerDriver driver2(
> + &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
> +
> + Future<Nothing> registered;
> + EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
> + .WillOnce(FutureSatisfy(®istered));
> +
> + EXPECT_CALL(sched2, resourceOffers(&driver2, _))
> + .WillRepeatedly(Return()); // Ignore any offers.
> +
> + // Drop the framework error message from the master to simulate
> + // a partitioned framework.
> + Future<FrameworkErrorMessage> frameworkErrorMessage =
> + DROP_PROTOBUF(FrameworkErrorMessage(), _ , _);
> +
> + driver2.start();
> +
> + AWAIT_READY(frameworkErrorMessage);
> +
> + AWAIT_READY(registered);
> +
> + // Now both the frameworks think they are registered with the
> + // master, but the master only knows about the second framework.
> +
> + // A 'killTask' by first framework should be dropped by the master.
> + EXPECT_CALL(sched1, statusUpdate(&driver1, _))
> + .Times(0);
> +
> + // 'TASK_FINSIHED' by the executor should reach the second framework.
> + Future<TaskStatus> status2;
> + EXPECT_CALL(sched2, statusUpdate(&driver2, _))
> + .WillOnce(FutureArg<1>(&status2));
> +
> + Future<KillTaskMessage> killTaskMessage =
> + FUTURE_PROTOBUF(KillTaskMessage(), _, _);
> +
> + driver1.killTask(status.get().task_id());
> +
> + AWAIT_READY(killTaskMessage);
> +
> + // By this point the master must have processed and ignored the
> + // 'killTask' message from the first framework. To verify this,
> + // the executor sends 'TASK_FINISHED' to ensure the only update
> + // received by the scheduler is 'TASK_FINISHED' and not
> + // 'TASK_KILLED'.
> + TaskStatus finishedStatus;
> + finishedStatus = status.get();
> + finishedStatus.set_state(TASK_FINISHED);
> + execDriver->sendStatusUpdate(finishedStatus);
> +
> + AWAIT_READY(status2);
> + EXPECT_EQ(TASK_FINISHED, status2.get().state());
> +
> + EXPECT_CALL(exec, shutdown(_))
> + .Times(AtMost(1));
> +
> + driver1.stop();
> + driver2.stop();
> +
> + driver1.join();
> + driver2.join();
> +
> + Shutdown();
> +}
> +
> +
> // This test checks that a scheduler exit shuts down the executor.
> TEST_F(FaultToleranceTest, SchedulerExit)
> {
>
>