You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/26 00:43:28 UTC

git commit: Fixed master to ignore messages from unregistered framework.

Updated Branches:
  refs/heads/vinod/master_ignore_framework [created] 4c11e98c4


Fixed master to ignore messages from unregistered framework.

Review: https://reviews.apache.org/r/15773


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c11e98c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c11e98c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c11e98c

Branch: refs/heads/vinod/master_ignore_framework
Commit: 4c11e98c4d0385ecb1369c93a0f58595eee54857
Parents: f051c45
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Nov 21 17:42:00 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Nov 25 15:36:40 2013 -0800

----------------------------------------------------------------------
 src/master/master.cpp               | 391 +++++++++++++++++++------------
 src/master/master.hpp               |   5 +
 src/tests/fault_tolerance_tests.cpp | 121 ++++++++++
 3 files changed, 373 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a08d012..f13d2a0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -771,7 +771,8 @@ void Master::registerFramework(
   }
 
   if (!elected()) {
-    LOG(WARNING) << "Ignoring register framework message since not elected yet";
+    LOG(WARNING) << "Ignoring register framework message from " << from
+                 << " since not elected yet";
     return;
   }
 
@@ -850,8 +851,8 @@ void Master::reregisterFramework(
   }
 
   if (!elected()) {
-    LOG(WARNING) << "Ignoring re-register framework message since "
-                 << "not elected yet";
+    LOG(WARNING) << "Ignoring re-register framework message from " << from
+                 << " since not elected yet";
     return;
   }
 
@@ -911,6 +912,15 @@ void Master::reregisterFramework(
       // us a different framework name, user name or executor info?
       LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over";
       failoverFramework(framework, from);
+    } else if (from != framework->pid) {
+      LOG(ERROR)
+        << "Framework " << frameworkInfo.id() << " at " << from
+        << " attempted to re-register while a framework at " << framework->pid
+        << " is already registered";
+      FrameworkErrorMessage message;
+      message.set_message("Framework failed over");
+      send(from, message);
+      return;
     } else {
       LOG(INFO) << "Allowing the Framework " << frameworkInfo.id()
                 << " to re-register with an already used id";
@@ -920,9 +930,8 @@ void Master::reregisterFramework(
       // replied to the offers but the driver might have dropped
       // those messages since it wasn't connected to the master.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        allocator->resourcesRecovered(offer->framework_id(),
-                                      offer->slave_id(),
-                                      offer->resources());
+        allocator->resourcesRecovered(
+            offer->framework_id(), offer->slave_id(), offer->resources());
         removeOffer(offer);
       }
 
@@ -1000,8 +1009,10 @@ void Master::unregisterFramework(
     if (framework->pid == from) {
       removeFramework(framework);
     } else {
-      LOG(WARNING) << from << " tried to unregister framework; "
-                   << "expecting " << framework->pid;
+      LOG(WARNING)
+        << "Ignoring unregister framework message for framework " << frameworkId
+        << " from " << from << " because it is not from the registered"
+        << " framework " << framework->pid;
     }
   }
 }
@@ -1013,15 +1024,22 @@ void Master::deactivateFramework(
 {
   Framework* framework = getFramework(frameworkId);
 
-  if (framework != NULL) {
-    if (framework->pid == from) {
-      LOG(INFO) << from << " asked to deactivate framework " << frameworkId;
-      deactivate(framework);
-    } else {
-      LOG(WARNING) << from << " tried to deactivate framework; "
-                   << "expecting " << framework->pid;
-    }
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring deactivate framework message for framework " << frameworkId
+      << " because the framework cannot be found";
+    return;
   }
+
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring deactivate framework message for framework " << frameworkId
+      << " from '" << from << "' because it is not from the registered"
+      << " framework '" << framework->pid << "'";
+    return;
+  }
+
+  deactivate(framework);
 }
 
 
@@ -1053,177 +1071,257 @@ void Master::deactivate(Framework* framework)
 }
 
 
-void Master::resourceRequest(const FrameworkID& frameworkId,
-                             const vector<Request>& requests)
+void Master::resourceRequest(
+    const UPID& from,
+    const FrameworkID& frameworkId,
+    const vector<Request>& requests)
 {
+  Framework* framework = getFramework(frameworkId);
+
+  if (framework == NULL) {
+    LOG(WARNING)
+        << "Ignoring resource request message from framework " << frameworkId
+        << " because the framework cannot be found";
+    return;
+  }
+
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring resource request message from framework " << frameworkId
+      << " from '" << from << "' because it is not from the registered "
+      << " framework '" << framework->pid << "'";
+    return;
+  }
+
+  LOG(INFO) << "Requesting resources for framework " << frameworkId;
   allocator->resourcesRequested(frameworkId, requests);
 }
 
 
-void Master::launchTasks(const FrameworkID& frameworkId,
-                         const OfferID& offerId,
-                         const vector<TaskInfo>& tasks,
-                         const Filters& filters)
+void Master::launchTasks(
+    const UPID& from,
+    const FrameworkID& frameworkId,
+    const OfferID& offerId,
+    const vector<TaskInfo>& tasks,
+    const Filters& filters)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    // TODO(benh): Support offer "hoarding" and allow multiple offers
-    // *from the same slave* to be used to launch tasks. This can be
-    // accomplished rather easily by collecting and merging all offers
-    // into a mega-offer and passing that offer to
-    // Master::processTasks.
-    Offer* offer = getOffer(offerId);
-    if (offer != NULL) {
-      CHECK_EQ(offer->framework_id(), frameworkId)
-          << "Offer " << offerId
-          << " has invalid frameworkId " << offer->framework_id();
-
-      Slave* slave = getSlave(offer->slave_id());
-      CHECK(slave != NULL)
-        << "Offer " << offerId << " outlived  slave "
-        << slave->id << " (" << slave->info.hostname() << ")";
 
-      // If a slave is disconnected we should've removed its offers.
-      CHECK(!slave->disconnected)
-        << "Offer " << offerId << " outlived disconnected slave "
-        << slave->id << " (" << slave->info.hostname() << ")";
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring launch tasks message for offer " << offerId
+      << " of framework " << frameworkId
+      << " because the framework cannot be found";
+    return;
+  }
 
-      processTasks(offer, framework, slave, tasks, filters);
-    } else {
-      // The offer is gone (possibly rescinded, lost slave, re-reply
-      // to same offer, etc). Report all tasks in it as failed.
-      // TODO: Consider adding a new task state TASK_INVALID for
-      // situations like these.
-      LOG(WARNING) << "Offer " << offerId << " is no longer valid";
-      foreach (const TaskInfo& task, tasks) {
-        StatusUpdateMessage message;
-        StatusUpdate* update = message.mutable_update();
-        update->mutable_framework_id()->MergeFrom(frameworkId);
-        TaskStatus* status = update->mutable_status();
-        status->mutable_task_id()->MergeFrom(task.task_id());
-        status->set_state(TASK_LOST);
-        status->set_message("Task launched with invalid offer");
-        update->set_timestamp(Clock::now().secs());
-        update->set_uuid(UUID::random().toBytes());
-
-        LOG(INFO) << "Sending status update " << *update
-                  << " for launch task attempt on invalid offer " << offerId;
-        send(framework->pid, message);
-      }
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring launch tasks message for offer " << offerId
+      << " of framework " << frameworkId << " from '" << from
+      << "' because it is not from the registered framework '"
+      << framework->pid << "'";
+    return;
+  }
+
+  // TODO(benh): Support offer "hoarding" and allow multiple offers
+  // *from the same slave* to be used to launch tasks. This can be
+  // accomplished rather easily by collecting and merging all offers
+  // into a mega-offer and passing that offer to
+  // Master::processTasks.
+  Offer* offer = getOffer(offerId);
+  if (offer != NULL) {
+    CHECK_EQ(offer->framework_id(), frameworkId)
+        << "Offer " << offerId
+        << " has invalid frameworkId " << offer->framework_id();
+
+    Slave* slave = getSlave(offer->slave_id());
+    CHECK(slave != NULL)
+      << "Offer " << offerId << " outlived  slave "
+      << slave->id << " (" << slave->info.hostname() << ")";
+
+    // If a slave is disconnected we should've removed its offers.
+    CHECK(!slave->disconnected)
+      << "Offer " << offerId << " outlived disconnected slave "
+      << slave->id << " (" << slave->info.hostname() << ")";
+
+    processTasks(offer, framework, slave, tasks, filters);
+  } else {
+    // The offer is gone (possibly rescinded, lost slave, re-reply
+    // to same offer, etc). Report all tasks in it as failed.
+    // TODO: Consider adding a new task state TASK_INVALID for
+    // situations like these.
+    LOG(WARNING) << "Offer " << offerId << " is no longer valid";
+    foreach (const TaskInfo& task, tasks) {
+      StatusUpdateMessage message;
+      StatusUpdate* update = message.mutable_update();
+      update->mutable_framework_id()->MergeFrom(frameworkId);
+      TaskStatus* status = update->mutable_status();
+      status->mutable_task_id()->MergeFrom(task.task_id());
+      status->set_state(TASK_LOST);
+      status->set_message("Task launched with invalid offer");
+      update->set_timestamp(Clock::now().secs());
+      update->set_uuid(UUID::random().toBytes());
+
+      LOG(INFO) << "Sending status update " << *update
+                << " for launch task attempt on invalid offer " << offerId;
+      send(framework->pid, message);
     }
   }
 }
 
 
-void Master::reviveOffers(const FrameworkID& frameworkId)
+void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    LOG(INFO) << "Reviving offers for framework " << framework->id;
-    allocator->offersRevived(framework->id);
+
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring revive offers message for framework " << frameworkId
+      << " because the framework cannot be found";
+    return;
+  }
+
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring revive offers message for framework " << frameworkId
+      << " from '" << from << "' because it is not from the registered"
+      << " framework '" << framework->pid << "'";
+    return;
   }
+
+  LOG(INFO) << "Reviving offers for framework " << framework->id;
+  allocator->offersRevived(framework->id);
 }
 
 
-void Master::killTask(const FrameworkID& frameworkId,
-                      const TaskID& taskId)
+void Master::killTask(
+    const UPID& from,
+    const FrameworkID& frameworkId,
+    const TaskID& taskId)
 {
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
 
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Task* task = framework->getTask(taskId);
-    if (task != NULL) {
-      Slave* slave = getSlave(task->slave_id());
-      CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
-
-      // We add the task to 'killedTasks' here because the slave
-      // might be partitioned or disconnected but the master
-      // doesn't know it yet.
-      slave->killedTasks.put(frameworkId, taskId);
-
-      // NOTE: This task will be properly reconciled when the
-      // disconnected slave re-registers with the master.
-      if (!slave->disconnected) {
-        LOG(INFO) << "Telling slave " << slave->id << " ("
-                  << slave->info.hostname() << ")"
-                  << " to kill task " << taskId
-                  << " of framework " << frameworkId;
-
-        KillTaskMessage message;
-        message.mutable_framework_id()->MergeFrom(frameworkId);
-        message.mutable_task_id()->MergeFrom(taskId);
-        send(slave->pid, message);
-      }
-    } else {
-      // TODO(benh): Once the scheduler has persistance and
-      // high-availability of it's tasks, it will be the one that
-      // determines that this invocation of 'killTask' is silly, and
-      // can just return "locally" (i.e., after hitting only the other
-      // replicas). Unfortunately, it still won't know the slave id.
-
-      LOG(WARNING) << "Cannot kill task " << taskId
-                   << " of framework " << frameworkId
-                   << " because it cannot be found";
-      StatusUpdateMessage message;
-      StatusUpdate* update = message.mutable_update();
-      update->mutable_framework_id()->MergeFrom(frameworkId);
-      TaskStatus* status = update->mutable_status();
-      status->mutable_task_id()->MergeFrom(taskId);
-      status->set_state(TASK_LOST);
-      status->set_message("Task not found");
-      update->set_timestamp(Clock::now().secs());
-      update->set_uuid(UUID::random().toBytes());
-      send(framework->pid, message);
+
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring kill task message for task " << taskId << " of framework "
+      << frameworkId << " because the framework cannot be found";
+    return;
+  }
+
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring kill task message for task " << taskId
+      << " of framework " << frameworkId << " from '" << from
+      << "' because it is not from the registered framework '"
+      << framework->pid << "'";
+    return;
+  }
+
+  Task* task = framework->getTask(taskId);
+  if (task != NULL) {
+    Slave* slave = getSlave(task->slave_id());
+    CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
+
+    // We add the task to 'killedTasks' here because the slave
+    // might be partitioned or disconnected but the master
+    // doesn't know it yet.
+    slave->killedTasks.put(frameworkId, taskId);
+
+    // NOTE: This task will be properly reconciled when the
+    // disconnected slave re-registers with the master.
+    if (!slave->disconnected) {
+      LOG(INFO) << "Telling slave " << slave->id << " ("
+                << slave->info.hostname() << ")"
+                << " to kill task " << taskId
+                << " of framework " << frameworkId;
+
+      KillTaskMessage message;
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_task_id()->MergeFrom(taskId);
+      send(slave->pid, message);
     }
   } else {
-    LOG(WARNING) << "Failed to kill task " << taskId
+    // TODO(benh): Once the scheduler has persistance and
+    // high-availability of it's tasks, it will be the one that
+    // determines that this invocation of 'killTask' is silly, and
+    // can just return "locally" (i.e., after hitting only the other
+    // replicas). Unfortunately, it still won't know the slave id.
+
+    LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
-                 << " because the framework cannot be found";
+                 << " because the task cannot be found";
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->set_state(TASK_LOST);
+    status->set_message("Task not found");
+    update->set_timestamp(Clock::now().secs());
+    update->set_uuid(UUID::random().toBytes());
+    send(framework->pid, message);
   }
 }
 
 
-void Master::schedulerMessage(const SlaveID& slaveId,
-                              const FrameworkID& frameworkId,
-                              const ExecutorID& executorId,
-                              const string& data)
+void Master::schedulerMessage(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const string& data)
 {
   Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Slave* slave = getSlave(slaveId);
-    if (slave != NULL) {
-      if (!slave->disconnected) {
-        LOG(INFO) << "Sending framework message for framework "
-                  << frameworkId << " to slave " << slaveId
-                  << " (" << slave->info.hostname() << ")";
-
-        FrameworkToExecutorMessage message;
-        message.mutable_slave_id()->MergeFrom(slaveId);
-        message.mutable_framework_id()->MergeFrom(frameworkId);
-        message.mutable_executor_id()->MergeFrom(executorId);
-        message.set_data(data);
-        send(slave->pid, message);
 
-        stats.validFrameworkMessages++;
-      } else {
-        LOG(WARNING) << "Cannot send framework message for framework "
-                     << frameworkId << " to slave " << slaveId
-                     << " (" << slave->info.hostname() << ")"
-                     << " because slave is disconnected";
-        stats.invalidFrameworkMessages++;
-      }
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring framework message for executor " << executorId
+      << " of framework " << frameworkId
+      << " because the framework cannot be found";
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring framework message for executor " << executorId
+      << " of framework " << frameworkId << " from " << from
+      << " because it is not from the registered framework "
+      << framework->pid;
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
+  Slave* slave = getSlave(slaveId);
+  if (slave != NULL) {
+    if (!slave->disconnected) {
+      LOG(INFO) << "Sending framework message for framework "
+                << frameworkId << " to slave " << slaveId
+                << " (" << slave->info.hostname() << ")";
+
+      FrameworkToExecutorMessage message;
+      message.mutable_slave_id()->MergeFrom(slaveId);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_executor_id()->MergeFrom(executorId);
+      message.set_data(data);
+      send(slave->pid, message);
+
+      stats.validFrameworkMessages++;
     } else {
       LOG(WARNING) << "Cannot send framework message for framework "
                    << frameworkId << " to slave " << slaveId
-                   << " because slave does not exist";
+                   << " (" << slave->info.hostname() << ")"
+                   << " because slave is disconnected";
       stats.invalidFrameworkMessages++;
     }
   } else {
     LOG(WARNING) << "Cannot send framework message for framework "
                  << frameworkId << " to slave " << slaveId
-                 << " because framework does not exist";
+                 << " because slave does not exist";
     stats.invalidFrameworkMessages++;
   }
 }
@@ -1707,6 +1805,11 @@ void Master::offer(const FrameworkID& frameworkId,
 }
 
 
+// TODO(vinod): If due to network partition there are two instances
+// of the framework that think they are leaders and try to
+// authenticate with master they would be stepping on each other's
+// toes. Currently it is tricky to detect this case because the
+// 'authenticate' message doesn't contain the 'FrameworkID'.
 void Master::authenticate(const UPID& from, const UPID& pid)
 {
   // Deactivate the framework if it's already registered.

http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c86c1f1..624c133 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -110,19 +110,24 @@ public:
       const process::UPID& from,
       const FrameworkID& frameworkId);
   void resourceRequest(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests);
   void launchTasks(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const OfferID& offerId,
       const std::vector<TaskInfo>& tasks,
       const Filters& filters);
   void reviveOffers(
+      const process::UPID& from,
       const FrameworkID& frameworkId);
   void killTask(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const TaskID& taskId);
   void schedulerMessage(
+      const process::UPID& from,
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 6cb5829..687c981 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1379,6 +1379,127 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
 }
 
 
+// This test verifies that a partitioned framework that still
+// thinks it is registered with the master cannot kill a task because
+// the master has re-registered another instance of the framework.
+// What this test does:
+// 1. Launch a master, slave and scheduler.
+// 2. Scheduler launches a task.
+// 3. Launch a second failed over scheduler.
+// 4. Make the first scheduler believe it is still registered.
+// 5. First scheduler attempts to kill the task which is ignored by the master.
+TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  // Start the first scheduler and launch a task.
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  driver1.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Now start the second failed over scheduler.
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId);
+
+  MesosSchedulerDriver driver2(
+      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillRepeatedly(Return()); // Ignore any offers.
+
+  // Drop the framework error message from the master to simulate
+  // a partitioned framework.
+  Future<FrameworkErrorMessage> frameworkErrorMessage =
+    DROP_PROTOBUF(FrameworkErrorMessage(), _ , _);
+
+  driver2.start();
+
+  AWAIT_READY(frameworkErrorMessage);
+
+  AWAIT_READY(registered);
+
+  // Now both the frameworks think they are registered with the
+  // master, but the master only knows about the second framework.
+
+  // A 'killTask' by first framework should be dropped by the master.
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .Times(0);
+
+  // 'TASK_FINSIHED' by the executor should reach the second framework.
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  Future<KillTaskMessage> killTaskMessage =
+    FUTURE_PROTOBUF(KillTaskMessage(), _, _);
+
+  driver1.killTask(status.get().task_id());
+
+  AWAIT_READY(killTaskMessage);
+
+  // By this point the master must have processed and ignored the
+  // 'killTask' message from the first framework. To verify this,
+  // the executor sends 'TASK_FINISHED' to ensure the only update
+  // received by the scheduler is 'TASK_FINISHED' and not
+  // 'TASK_KILLED'.
+  TaskStatus finishedStatus;
+  finishedStatus = status.get();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  Shutdown();
+}
+
+
 // This test checks that a scheduler exit shuts down the executor.
 TEST_F(FaultToleranceTest, SchedulerExit)
 {


Re: git commit: Fixed master to ignore messages from unregistered framework.

Posted by Vinod Kone <vi...@gmail.com>.
oops sorry. looks like i created a branch instead of committing the patch
to head. will fix shortly.


On Mon, Nov 25, 2013 at 3:43 PM, <vi...@apache.org> wrote:

> Updated Branches:
>   refs/heads/vinod/master_ignore_framework [created] 4c11e98c4
>
>
> Fixed master to ignore messages from unregistered framework.
>
> Review: https://reviews.apache.org/r/15773
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c11e98c
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c11e98c
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c11e98c
>
> Branch: refs/heads/vinod/master_ignore_framework
> Commit: 4c11e98c4d0385ecb1369c93a0f58595eee54857
> Parents: f051c45
> Author: Vinod Kone <vi...@twitter.com>
> Authored: Thu Nov 21 17:42:00 2013 -0800
> Committer: Vinod Kone <vi...@twitter.com>
> Committed: Mon Nov 25 15:36:40 2013 -0800
>
> ----------------------------------------------------------------------
>  src/master/master.cpp               | 391 +++++++++++++++++++------------
>  src/master/master.hpp               |   5 +
>  src/tests/fault_tolerance_tests.cpp | 121 ++++++++++
>  3 files changed, 373 insertions(+), 144 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.cpp b/src/master/master.cpp
> index a08d012..f13d2a0 100644
> --- a/src/master/master.cpp
> +++ b/src/master/master.cpp
> @@ -771,7 +771,8 @@ void Master::registerFramework(
>    }
>
>    if (!elected()) {
> -    LOG(WARNING) << "Ignoring register framework message since not
> elected yet";
> +    LOG(WARNING) << "Ignoring register framework message from " << from
> +                 << " since not elected yet";
>      return;
>    }
>
> @@ -850,8 +851,8 @@ void Master::reregisterFramework(
>    }
>
>    if (!elected()) {
> -    LOG(WARNING) << "Ignoring re-register framework message since "
> -                 << "not elected yet";
> +    LOG(WARNING) << "Ignoring re-register framework message from " << from
> +                 << " since not elected yet";
>      return;
>    }
>
> @@ -911,6 +912,15 @@ void Master::reregisterFramework(
>        // us a different framework name, user name or executor info?
>        LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over";
>        failoverFramework(framework, from);
> +    } else if (from != framework->pid) {
> +      LOG(ERROR)
> +        << "Framework " << frameworkInfo.id() << " at " << from
> +        << " attempted to re-register while a framework at " <<
> framework->pid
> +        << " is already registered";
> +      FrameworkErrorMessage message;
> +      message.set_message("Framework failed over");
> +      send(from, message);
> +      return;
>      } else {
>        LOG(INFO) << "Allowing the Framework " << frameworkInfo.id()
>                  << " to re-register with an already used id";
> @@ -920,9 +930,8 @@ void Master::reregisterFramework(
>        // replied to the offers but the driver might have dropped
>        // those messages since it wasn't connected to the master.
>        foreach (Offer* offer, utils::copy(framework->offers)) {
> -        allocator->resourcesRecovered(offer->framework_id(),
> -                                      offer->slave_id(),
> -                                      offer->resources());
> +        allocator->resourcesRecovered(
> +            offer->framework_id(), offer->slave_id(), offer->resources());
>          removeOffer(offer);
>        }
>
> @@ -1000,8 +1009,10 @@ void Master::unregisterFramework(
>      if (framework->pid == from) {
>        removeFramework(framework);
>      } else {
> -      LOG(WARNING) << from << " tried to unregister framework; "
> -                   << "expecting " << framework->pid;
> +      LOG(WARNING)
> +        << "Ignoring unregister framework message for framework " <<
> frameworkId
> +        << " from " << from << " because it is not from the registered"
> +        << " framework " << framework->pid;
>      }
>    }
>  }
> @@ -1013,15 +1024,22 @@ void Master::deactivateFramework(
>  {
>    Framework* framework = getFramework(frameworkId);
>
> -  if (framework != NULL) {
> -    if (framework->pid == from) {
> -      LOG(INFO) << from << " asked to deactivate framework " <<
> frameworkId;
> -      deactivate(framework);
> -    } else {
> -      LOG(WARNING) << from << " tried to deactivate framework; "
> -                   << "expecting " << framework->pid;
> -    }
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +      << "Ignoring deactivate framework message for framework " <<
> frameworkId
> +      << " because the framework cannot be found";
> +    return;
>    }
> +
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring deactivate framework message for framework " <<
> frameworkId
> +      << " from '" << from << "' because it is not from the registered"
> +      << " framework '" << framework->pid << "'";
> +    return;
> +  }
> +
> +  deactivate(framework);
>  }
>
>
> @@ -1053,177 +1071,257 @@ void Master::deactivate(Framework* framework)
>  }
>
>
> -void Master::resourceRequest(const FrameworkID& frameworkId,
> -                             const vector<Request>& requests)
> +void Master::resourceRequest(
> +    const UPID& from,
> +    const FrameworkID& frameworkId,
> +    const vector<Request>& requests)
>  {
> +  Framework* framework = getFramework(frameworkId);
> +
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +        << "Ignoring resource request message from framework " <<
> frameworkId
> +        << " because the framework cannot be found";
> +    return;
> +  }
> +
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring resource request message from framework " <<
> frameworkId
> +      << " from '" << from << "' because it is not from the registered "
> +      << " framework '" << framework->pid << "'";
> +    return;
> +  }
> +
> +  LOG(INFO) << "Requesting resources for framework " << frameworkId;
>    allocator->resourcesRequested(frameworkId, requests);
>  }
>
>
> -void Master::launchTasks(const FrameworkID& frameworkId,
> -                         const OfferID& offerId,
> -                         const vector<TaskInfo>& tasks,
> -                         const Filters& filters)
> +void Master::launchTasks(
> +    const UPID& from,
> +    const FrameworkID& frameworkId,
> +    const OfferID& offerId,
> +    const vector<TaskInfo>& tasks,
> +    const Filters& filters)
>  {
>    Framework* framework = getFramework(frameworkId);
> -  if (framework != NULL) {
> -    // TODO(benh): Support offer "hoarding" and allow multiple offers
> -    // *from the same slave* to be used to launch tasks. This can be
> -    // accomplished rather easily by collecting and merging all offers
> -    // into a mega-offer and passing that offer to
> -    // Master::processTasks.
> -    Offer* offer = getOffer(offerId);
> -    if (offer != NULL) {
> -      CHECK_EQ(offer->framework_id(), frameworkId)
> -          << "Offer " << offerId
> -          << " has invalid frameworkId " << offer->framework_id();
> -
> -      Slave* slave = getSlave(offer->slave_id());
> -      CHECK(slave != NULL)
> -        << "Offer " << offerId << " outlived  slave "
> -        << slave->id << " (" << slave->info.hostname() << ")";
>
> -      // If a slave is disconnected we should've removed its offers.
> -      CHECK(!slave->disconnected)
> -        << "Offer " << offerId << " outlived disconnected slave "
> -        << slave->id << " (" << slave->info.hostname() << ")";
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +      << "Ignoring launch tasks message for offer " << offerId
> +      << " of framework " << frameworkId
> +      << " because the framework cannot be found";
> +    return;
> +  }
>
> -      processTasks(offer, framework, slave, tasks, filters);
> -    } else {
> -      // The offer is gone (possibly rescinded, lost slave, re-reply
> -      // to same offer, etc). Report all tasks in it as failed.
> -      // TODO: Consider adding a new task state TASK_INVALID for
> -      // situations like these.
> -      LOG(WARNING) << "Offer " << offerId << " is no longer valid";
> -      foreach (const TaskInfo& task, tasks) {
> -        StatusUpdateMessage message;
> -        StatusUpdate* update = message.mutable_update();
> -        update->mutable_framework_id()->MergeFrom(frameworkId);
> -        TaskStatus* status = update->mutable_status();
> -        status->mutable_task_id()->MergeFrom(task.task_id());
> -        status->set_state(TASK_LOST);
> -        status->set_message("Task launched with invalid offer");
> -        update->set_timestamp(Clock::now().secs());
> -        update->set_uuid(UUID::random().toBytes());
> -
> -        LOG(INFO) << "Sending status update " << *update
> -                  << " for launch task attempt on invalid offer " <<
> offerId;
> -        send(framework->pid, message);
> -      }
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring launch tasks message for offer " << offerId
> +      << " of framework " << frameworkId << " from '" << from
> +      << "' because it is not from the registered framework '"
> +      << framework->pid << "'";
> +    return;
> +  }
> +
> +  // TODO(benh): Support offer "hoarding" and allow multiple offers
> +  // *from the same slave* to be used to launch tasks. This can be
> +  // accomplished rather easily by collecting and merging all offers
> +  // into a mega-offer and passing that offer to
> +  // Master::processTasks.
> +  Offer* offer = getOffer(offerId);
> +  if (offer != NULL) {
> +    CHECK_EQ(offer->framework_id(), frameworkId)
> +        << "Offer " << offerId
> +        << " has invalid frameworkId " << offer->framework_id();
> +
> +    Slave* slave = getSlave(offer->slave_id());
> +    CHECK(slave != NULL)
> +      << "Offer " << offerId << " outlived  slave "
> +      << slave->id << " (" << slave->info.hostname() << ")";
> +
> +    // If a slave is disconnected we should've removed its offers.
> +    CHECK(!slave->disconnected)
> +      << "Offer " << offerId << " outlived disconnected slave "
> +      << slave->id << " (" << slave->info.hostname() << ")";
> +
> +    processTasks(offer, framework, slave, tasks, filters);
> +  } else {
> +    // The offer is gone (possibly rescinded, lost slave, re-reply
> +    // to same offer, etc). Report all tasks in it as failed.
> +    // TODO: Consider adding a new task state TASK_INVALID for
> +    // situations like these.
> +    LOG(WARNING) << "Offer " << offerId << " is no longer valid";
> +    foreach (const TaskInfo& task, tasks) {
> +      StatusUpdateMessage message;
> +      StatusUpdate* update = message.mutable_update();
> +      update->mutable_framework_id()->MergeFrom(frameworkId);
> +      TaskStatus* status = update->mutable_status();
> +      status->mutable_task_id()->MergeFrom(task.task_id());
> +      status->set_state(TASK_LOST);
> +      status->set_message("Task launched with invalid offer");
> +      update->set_timestamp(Clock::now().secs());
> +      update->set_uuid(UUID::random().toBytes());
> +
> +      LOG(INFO) << "Sending status update " << *update
> +                << " for launch task attempt on invalid offer " <<
> offerId;
> +      send(framework->pid, message);
>      }
>    }
>  }
>
>
> -void Master::reviveOffers(const FrameworkID& frameworkId)
> +void Master::reviveOffers(const UPID& from, const FrameworkID&
> frameworkId)
>  {
>    Framework* framework = getFramework(frameworkId);
> -  if (framework != NULL) {
> -    LOG(INFO) << "Reviving offers for framework " << framework->id;
> -    allocator->offersRevived(framework->id);
> +
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +      << "Ignoring revive offers message for framework " << frameworkId
> +      << " because the framework cannot be found";
> +    return;
> +  }
> +
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring revive offers message for framework " << frameworkId
> +      << " from '" << from << "' because it is not from the registered"
> +      << " framework '" << framework->pid << "'";
> +    return;
>    }
> +
> +  LOG(INFO) << "Reviving offers for framework " << framework->id;
> +  allocator->offersRevived(framework->id);
>  }
>
>
> -void Master::killTask(const FrameworkID& frameworkId,
> -                      const TaskID& taskId)
> +void Master::killTask(
> +    const UPID& from,
> +    const FrameworkID& frameworkId,
> +    const TaskID& taskId)
>  {
>    LOG(INFO) << "Asked to kill task " << taskId
>              << " of framework " << frameworkId;
>
>    Framework* framework = getFramework(frameworkId);
> -  if (framework != NULL) {
> -    Task* task = framework->getTask(taskId);
> -    if (task != NULL) {
> -      Slave* slave = getSlave(task->slave_id());
> -      CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
> -
> -      // We add the task to 'killedTasks' here because the slave
> -      // might be partitioned or disconnected but the master
> -      // doesn't know it yet.
> -      slave->killedTasks.put(frameworkId, taskId);
> -
> -      // NOTE: This task will be properly reconciled when the
> -      // disconnected slave re-registers with the master.
> -      if (!slave->disconnected) {
> -        LOG(INFO) << "Telling slave " << slave->id << " ("
> -                  << slave->info.hostname() << ")"
> -                  << " to kill task " << taskId
> -                  << " of framework " << frameworkId;
> -
> -        KillTaskMessage message;
> -        message.mutable_framework_id()->MergeFrom(frameworkId);
> -        message.mutable_task_id()->MergeFrom(taskId);
> -        send(slave->pid, message);
> -      }
> -    } else {
> -      // TODO(benh): Once the scheduler has persistance and
> -      // high-availability of it's tasks, it will be the one that
> -      // determines that this invocation of 'killTask' is silly, and
> -      // can just return "locally" (i.e., after hitting only the other
> -      // replicas). Unfortunately, it still won't know the slave id.
> -
> -      LOG(WARNING) << "Cannot kill task " << taskId
> -                   << " of framework " << frameworkId
> -                   << " because it cannot be found";
> -      StatusUpdateMessage message;
> -      StatusUpdate* update = message.mutable_update();
> -      update->mutable_framework_id()->MergeFrom(frameworkId);
> -      TaskStatus* status = update->mutable_status();
> -      status->mutable_task_id()->MergeFrom(taskId);
> -      status->set_state(TASK_LOST);
> -      status->set_message("Task not found");
> -      update->set_timestamp(Clock::now().secs());
> -      update->set_uuid(UUID::random().toBytes());
> -      send(framework->pid, message);
> +
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +      << "Ignoring kill task message for task " << taskId << " of
> framework "
> +      << frameworkId << " because the framework cannot be found";
> +    return;
> +  }
> +
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring kill task message for task " << taskId
> +      << " of framework " << frameworkId << " from '" << from
> +      << "' because it is not from the registered framework '"
> +      << framework->pid << "'";
> +    return;
> +  }
> +
> +  Task* task = framework->getTask(taskId);
> +  if (task != NULL) {
> +    Slave* slave = getSlave(task->slave_id());
> +    CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
> +
> +    // We add the task to 'killedTasks' here because the slave
> +    // might be partitioned or disconnected but the master
> +    // doesn't know it yet.
> +    slave->killedTasks.put(frameworkId, taskId);
> +
> +    // NOTE: This task will be properly reconciled when the
> +    // disconnected slave re-registers with the master.
> +    if (!slave->disconnected) {
> +      LOG(INFO) << "Telling slave " << slave->id << " ("
> +                << slave->info.hostname() << ")"
> +                << " to kill task " << taskId
> +                << " of framework " << frameworkId;
> +
> +      KillTaskMessage message;
> +      message.mutable_framework_id()->MergeFrom(frameworkId);
> +      message.mutable_task_id()->MergeFrom(taskId);
> +      send(slave->pid, message);
>      }
>    } else {
> -    LOG(WARNING) << "Failed to kill task " << taskId
> +    // TODO(benh): Once the scheduler has persistance and
> +    // high-availability of it's tasks, it will be the one that
> +    // determines that this invocation of 'killTask' is silly, and
> +    // can just return "locally" (i.e., after hitting only the other
> +    // replicas). Unfortunately, it still won't know the slave id.
> +
> +    LOG(WARNING) << "Cannot kill task " << taskId
>                   << " of framework " << frameworkId
> -                 << " because the framework cannot be found";
> +                 << " because the task cannot be found";
> +    StatusUpdateMessage message;
> +    StatusUpdate* update = message.mutable_update();
> +    update->mutable_framework_id()->MergeFrom(frameworkId);
> +    TaskStatus* status = update->mutable_status();
> +    status->mutable_task_id()->MergeFrom(taskId);
> +    status->set_state(TASK_LOST);
> +    status->set_message("Task not found");
> +    update->set_timestamp(Clock::now().secs());
> +    update->set_uuid(UUID::random().toBytes());
> +    send(framework->pid, message);
>    }
>  }
>
>
> -void Master::schedulerMessage(const SlaveID& slaveId,
> -                              const FrameworkID& frameworkId,
> -                              const ExecutorID& executorId,
> -                              const string& data)
> +void Master::schedulerMessage(
> +    const UPID& from,
> +    const SlaveID& slaveId,
> +    const FrameworkID& frameworkId,
> +    const ExecutorID& executorId,
> +    const string& data)
>  {
>    Framework* framework = getFramework(frameworkId);
> -  if (framework != NULL) {
> -    Slave* slave = getSlave(slaveId);
> -    if (slave != NULL) {
> -      if (!slave->disconnected) {
> -        LOG(INFO) << "Sending framework message for framework "
> -                  << frameworkId << " to slave " << slaveId
> -                  << " (" << slave->info.hostname() << ")";
> -
> -        FrameworkToExecutorMessage message;
> -        message.mutable_slave_id()->MergeFrom(slaveId);
> -        message.mutable_framework_id()->MergeFrom(frameworkId);
> -        message.mutable_executor_id()->MergeFrom(executorId);
> -        message.set_data(data);
> -        send(slave->pid, message);
>
> -        stats.validFrameworkMessages++;
> -      } else {
> -        LOG(WARNING) << "Cannot send framework message for framework "
> -                     << frameworkId << " to slave " << slaveId
> -                     << " (" << slave->info.hostname() << ")"
> -                     << " because slave is disconnected";
> -        stats.invalidFrameworkMessages++;
> -      }
> +  if (framework == NULL) {
> +    LOG(WARNING)
> +      << "Ignoring framework message for executor " << executorId
> +      << " of framework " << frameworkId
> +      << " because the framework cannot be found";
> +    stats.invalidFrameworkMessages++;
> +    return;
> +  }
> +
> +  if (from != framework->pid) {
> +    LOG(WARNING)
> +      << "Ignoring framework message for executor " << executorId
> +      << " of framework " << frameworkId << " from " << from
> +      << " because it is not from the registered framework "
> +      << framework->pid;
> +    stats.invalidFrameworkMessages++;
> +    return;
> +  }
> +
> +  Slave* slave = getSlave(slaveId);
> +  if (slave != NULL) {
> +    if (!slave->disconnected) {
> +      LOG(INFO) << "Sending framework message for framework "
> +                << frameworkId << " to slave " << slaveId
> +                << " (" << slave->info.hostname() << ")";
> +
> +      FrameworkToExecutorMessage message;
> +      message.mutable_slave_id()->MergeFrom(slaveId);
> +      message.mutable_framework_id()->MergeFrom(frameworkId);
> +      message.mutable_executor_id()->MergeFrom(executorId);
> +      message.set_data(data);
> +      send(slave->pid, message);
> +
> +      stats.validFrameworkMessages++;
>      } else {
>        LOG(WARNING) << "Cannot send framework message for framework "
>                     << frameworkId << " to slave " << slaveId
> -                   << " because slave does not exist";
> +                   << " (" << slave->info.hostname() << ")"
> +                   << " because slave is disconnected";
>        stats.invalidFrameworkMessages++;
>      }
>    } else {
>      LOG(WARNING) << "Cannot send framework message for framework "
>                   << frameworkId << " to slave " << slaveId
> -                 << " because framework does not exist";
> +                 << " because slave does not exist";
>      stats.invalidFrameworkMessages++;
>    }
>  }
> @@ -1707,6 +1805,11 @@ void Master::offer(const FrameworkID& frameworkId,
>  }
>
>
> +// TODO(vinod): If due to network partition there are two instances
> +// of the framework that think they are leaders and try to
> +// authenticate with master they would be stepping on each other's
> +// toes. Currently it is tricky to detect this case because the
> +// 'authenticate' message doesn't contain the 'FrameworkID'.
>  void Master::authenticate(const UPID& from, const UPID& pid)
>  {
>    // Deactivate the framework if it's already registered.
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.hpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.hpp b/src/master/master.hpp
> index c86c1f1..624c133 100644
> --- a/src/master/master.hpp
> +++ b/src/master/master.hpp
> @@ -110,19 +110,24 @@ public:
>        const process::UPID& from,
>        const FrameworkID& frameworkId);
>    void resourceRequest(
> +      const process::UPID& from,
>        const FrameworkID& frameworkId,
>        const std::vector<Request>& requests);
>    void launchTasks(
> +      const process::UPID& from,
>        const FrameworkID& frameworkId,
>        const OfferID& offerId,
>        const std::vector<TaskInfo>& tasks,
>        const Filters& filters);
>    void reviveOffers(
> +      const process::UPID& from,
>        const FrameworkID& frameworkId);
>    void killTask(
> +      const process::UPID& from,
>        const FrameworkID& frameworkId,
>        const TaskID& taskId);
>    void schedulerMessage(
> +      const process::UPID& from,
>        const SlaveID& slaveId,
>        const FrameworkID& frameworkId,
>        const ExecutorID& executorId,
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/tests/fault_tolerance_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/src/tests/fault_tolerance_tests.cpp
> b/src/tests/fault_tolerance_tests.cpp
> index 6cb5829..687c981 100644
> --- a/src/tests/fault_tolerance_tests.cpp
> +++ b/src/tests/fault_tolerance_tests.cpp
> @@ -1379,6 +1379,127 @@ TEST_F(FaultToleranceTest,
> SchedulerFailoverFrameworkMessage)
>  }
>
>
> +// This test verifies that a partitioned framework that still
> +// thinks it is registered with the master cannot kill a task because
> +// the master has re-registered another instance of the framework.
> +// What this test does:
> +// 1. Launch a master, slave and scheduler.
> +// 2. Scheduler launches a task.
> +// 3. Launch a second failed over scheduler.
> +// 4. Make the first scheduler believe it is still registered.
> +// 5. First scheduler attempts to kill the task which is ignored by the
> master.
> +TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
> +{
> +  Try<PID<Master> > master = StartMaster();
> +  ASSERT_SOME(master);
> +
> +  MockExecutor exec(DEFAULT_EXECUTOR_ID);
> +
> +  Try<PID<Slave> > slave = StartSlave(&exec);
> +  ASSERT_SOME(slave);
> +
> +  // Start the first scheduler and launch a task.
> +  MockScheduler sched1;
> +  MesosSchedulerDriver driver1(
> +      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
> +
> +  FrameworkID frameworkId;
> +  EXPECT_CALL(sched1, registered(&driver1, _, _))
> +    .WillOnce(SaveArg<1>(&frameworkId));
> +
> +  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
> +    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
> +    .WillRepeatedly(Return()); // Ignore subsequent offers.
> +
> +  Future<TaskStatus> status;
> +  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
> +    .WillOnce(FutureArg<1>(&status));
> +
> +  ExecutorDriver* execDriver;
> +  EXPECT_CALL(exec, registered(_, _, _, _))
> +    .WillOnce(SaveArg<0>(&execDriver));
> +
> +  EXPECT_CALL(exec, launchTask(_, _))
> +    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
> +
> +  driver1.start();
> +
> +  AWAIT_READY(status);
> +  EXPECT_EQ(TASK_RUNNING, status.get().state());
> +
> +  // Now start the second failed over scheduler.
> +  MockScheduler sched2;
> +
> +  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
> +  framework2 = DEFAULT_FRAMEWORK_INFO;
> +  framework2.mutable_id()->MergeFrom(frameworkId);
> +
> +  MesosSchedulerDriver driver2(
> +      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
> +
> +  Future<Nothing> registered;
> +  EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
> +    .WillOnce(FutureSatisfy(&registered));
> +
> +  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
> +    .WillRepeatedly(Return()); // Ignore any offers.
> +
> +  // Drop the framework error message from the master to simulate
> +  // a partitioned framework.
> +  Future<FrameworkErrorMessage> frameworkErrorMessage =
> +    DROP_PROTOBUF(FrameworkErrorMessage(), _ , _);
> +
> +  driver2.start();
> +
> +  AWAIT_READY(frameworkErrorMessage);
> +
> +  AWAIT_READY(registered);
> +
> +  // Now both the frameworks think they are registered with the
> +  // master, but the master only knows about the second framework.
> +
> +  // A 'killTask' by first framework should be dropped by the master.
> +  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
> +    .Times(0);
> +
> +  // 'TASK_FINSIHED' by the executor should reach the second framework.
> +  Future<TaskStatus> status2;
> +  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
> +    .WillOnce(FutureArg<1>(&status2));
> +
> +  Future<KillTaskMessage> killTaskMessage =
> +    FUTURE_PROTOBUF(KillTaskMessage(), _, _);
> +
> +  driver1.killTask(status.get().task_id());
> +
> +  AWAIT_READY(killTaskMessage);
> +
> +  // By this point the master must have processed and ignored the
> +  // 'killTask' message from the first framework. To verify this,
> +  // the executor sends 'TASK_FINISHED' to ensure the only update
> +  // received by the scheduler is 'TASK_FINISHED' and not
> +  // 'TASK_KILLED'.
> +  TaskStatus finishedStatus;
> +  finishedStatus = status.get();
> +  finishedStatus.set_state(TASK_FINISHED);
> +  execDriver->sendStatusUpdate(finishedStatus);
> +
> +  AWAIT_READY(status2);
> +  EXPECT_EQ(TASK_FINISHED, status2.get().state());
> +
> +  EXPECT_CALL(exec, shutdown(_))
> +    .Times(AtMost(1));
> +
> +  driver1.stop();
> +  driver2.stop();
> +
> +  driver1.join();
> +  driver2.join();
> +
> +  Shutdown();
> +}
> +
> +
>  // This test checks that a scheduler exit shuts down the executor.
>  TEST_F(FaultToleranceTest, SchedulerExit)
>  {
>
>