You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/09 03:30:10 UTC

[1/3] git commit: Introduced Master <-> Slave reconciliation.

Repository: mesos
Updated Branches:
  refs/heads/master b810250fa -> c96ba8f60


Introduced Master <-> Slave reconciliation.

Review: https://reviews.apache.org/r/26206


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc4444eb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc4444eb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc4444eb

Branch: refs/heads/master
Commit: cc4444eb47705649a4e93a4045b4d130dd4b3354
Parents: b810250
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 11:31:53 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:04 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 95 +++++++++++++++++++-------------
 src/master/master.hpp               |  5 +-
 src/messages/messages.proto         |  2 +
 src/slave/slave.cpp                 | 79 +++++++++++++++++++++-----
 src/slave/slave.hpp                 |  7 ++-
 src/tests/fault_tolerance_tests.cpp | 12 +++-
 6 files changed, 142 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc48b96..cb46cec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3092,21 +3092,7 @@ void Master::reregisterSlave(
     // For now, we assume this slave is not nefarious (eventually
     // this will be handled by orthogonal security measures like key
     // based authentication).
-    LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
-                 << ") is being allowed to re-register with an already"
-                 << " in use id (" << slave->id << ")";
-
-    // TODO(bmahler): There's an implicit assumption here that when
-    // the master already knows about this slave, the slave cannot
-    // have tasks unknown to the master. This _should_ be the case
-    // since the causal relationship is:
-    //   slave removes task -> master removes task
-    // We should enforce this via a CHECK (dangerous), or by shutting
-    // down slaves that are found to violate this assumption.
-
-    SlaveReregisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(from, message);
+    LOG(INFO) << "Re-registering slave " << *slave;
 
     // Update the slave pid and relink to it.
     // NOTE: Re-linking the slave here always rather than only when
@@ -3119,8 +3105,8 @@ void Master::reregisterSlave(
     link(slave->pid);
 
     // Reconcile tasks between master and the slave.
-    // NOTE: This needs to be done after the registration message is
-    // sent to the slave and the new pid is linked.
+    // NOTE: This sends the re-registered message, including tasks
+    // that need to be reconciled by the slave.
     reconcile(slave, executorInfos, tasks);
 
     // If this is a disconnected slave, add it back to the allocator.
@@ -3871,44 +3857,79 @@ void Master::reconcile(
 {
   CHECK_NOTNULL(slave);
 
+  // TODO(bmahler): There's an implicit assumption here the slave
+  // cannot have tasks unknown to the master. This _should_ be the
+  // case since the causal relationship is:
+  //   slave removes task -> master removes task
+  // Add error logging for any violations of this assumption!
+
   // We convert the 'tasks' into a map for easier lookup below.
-  // TODO(vinod): Check if the tasks are known to the master.
   multihashmap<FrameworkID, TaskID> slaveTasks;
   foreach (const Task& task, tasks) {
     slaveTasks.put(task.framework_id(), task.task_id());
   }
 
-  // Send TASK_LOST updates for tasks present in the master but
-  // missing from the slave. This could happen if the task was
-  // dropped by the slave (e.g., slave exited before getting the
-  // task or the task was launched while slave was in recovery).
-  // NOTE: copies are needed because removeTask modifies slave->tasks.
+  // Look for tasks missing in the slave's re-registration message.
+  // This can occur when:
+  //   (1) a launch message was dropped (e.g. slave failed over), or
+  //   (2) the slave re-registration raced with a launch message, in
+  //       which case the slave actually received the task.
+  // To resolve both cases correctly, we must reconcile through the
+  // slave. For slaves that do not support reconciliation, we keep
+  // the old semantics and cover only case (1) via TASK_LOST.
+  SlaveReregisteredMessage reregistered;
+  reregistered.mutable_slave_id()->MergeFrom(slave->id);
+
+  // NOTE: copies are needed because removeTask modified slave->tasks.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    ReconcileTasksMessage reconcile;
+    reconcile.mutable_framework_id()->CopyFrom(frameworkId);
+
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
         LOG(WARNING) << "Task " << task->task_id()
                      << " of framework " << task->framework_id()
                      << " unknown to the slave " << *slave
-                     << " during re-registration";
-
-        const StatusUpdate& update = protobuf::createStatusUpdate(
-            task->framework_id(),
-            slave->id,
-            task->task_id(),
-            TASK_LOST,
-            "Task is unknown to the slave");
+                     << " during re-registration"
+                     << (slave->version.isSome()
+                         ? ": reconciling with the slave"
+                         : ": sending TASK_LOST");
+
+        if (slave->version.isSome()) {
+          TaskStatus* status = reconcile.add_statuses();
+          status->mutable_task_id()->CopyFrom(task->task_id());
+          status->mutable_slave_id()->CopyFrom(slave->id);
+          status->set_state(task->state());
+          status->set_message("Reconciliation request");
+          status->set_timestamp(Clock::now().secs());
+        } else {
+          // TODO(bmahler): Remove this case in 0.22.0.
+          const StatusUpdate& update = protobuf::createStatusUpdate(
+              task->framework_id(),
+              slave->id,
+              task->task_id(),
+              TASK_LOST,
+              "Task is unknown to the slave");
+
+          updateTask(task, update.status());
+          removeTask(task);
 
-        updateTask(task, update.status());
-        removeTask(task);
-
-        Framework* framework = getFramework(frameworkId);
-        if (framework != NULL) {
-          forward(update, UPID(), framework);
+          Framework* framework = getFramework(frameworkId);
+          if (framework != NULL) {
+            forward(update, UPID(), framework);
+          }
         }
       }
     }
+
+    if (slave->version.isSome() && reconcile.statuses_size() > 0) {
+      reregistered.add_reconciliations()->CopyFrom(reconcile);
+    }
   }
 
+  // Re-register the slave.
+  send(slave->pid, reregistered);
+
   // Likewise, any executors that are present in the master but
   // not present in the slave must be removed to correctly account
   // for resources. First we index the executors for fast lookup below.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 49589f4..14f1d0f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -292,9 +292,8 @@ protected:
   // Invoked when the contender has entered the contest.
   void contended(const process::Future<process::Future<Nothing> >& candidacy);
 
-  // Reconciles a re-registering slave's tasks / executors and sends
-  // TASK_LOST updates for tasks known to the master but unknown to
-  // the slave.
+  // Handles a known re-registering slave by reconciling the master's
+  // view of the slave's tasks and executors.
   void reconcile(
       Slave* slave,
       const std::vector<ExecutorInfo>& executors,

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index edf1e4e..77515d9 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -259,6 +259,8 @@ message SlaveRegisteredMessage {
 
 message SlaveReregisteredMessage {
   required SlaveID slave_id = 1;
+
+  repeated ReconcileTasksMessage reconciliations = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 809b008..cb37599 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -347,7 +347,8 @@ void Slave::initialize()
 
   install<SlaveReregisteredMessage>(
       &Slave::reregistered,
-      &SlaveReregisteredMessage::slave_id);
+      &SlaveReregisteredMessage::slave_id,
+      &SlaveReregisteredMessage::reconciliations);
 
   install<RunTaskMessage>(
       &Slave::runTask,
@@ -810,7 +811,10 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
 }
 
 
-void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
+void Slave::reregistered(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const vector<ReconcileTasksMessage>& reconciliations)
 {
   if (master != from) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
@@ -823,25 +827,15 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
     case DISCONNECTED:
       CHECK_SOME(master);
       LOG(INFO) << "Re-registered with master " << master.get();
-
       state = RUNNING;
-      if (!(info.id() == slaveId)) {
-        EXIT(1) << "Re-registered but got wrong id: " << slaveId
-                << "(expected: " << info.id() << "). Committing suicide";
-      }
       break;
     case RUNNING:
-      // Already re-registered!
-      if (!(info.id() == slaveId)) {
-        EXIT(1) << "Re-registered but got wrong id: " << slaveId
-                << "(expected: " << info.id() << "). Committing suicide";
-      }
       CHECK_SOME(master);
       LOG(WARNING) << "Already re-registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring re-registration because slave is terminating";
-      break;
+      return;
     case RECOVERING:
       // It's possible to receive a message intended for the previous
       // run of the slave here. Short term we can leave this as is and
@@ -851,7 +845,64 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
       // https://issues.apache.org/jira/browse/MESOS-677
     default:
       LOG(FATAL) << "Unexpected slave state " << state;
-      break;
+      return;;
+  }
+
+  if (!(info.id() == slaveId)) {
+    EXIT(1) << "Re-registered but got wrong id: " << slaveId
+            << "(expected: " << info.id() << "). Committing suicide";
+  }
+
+  // Reconcile any tasks per the master's request.
+  foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
+    Framework* framework = getFramework(reconcile.framework_id());
+
+    foreach (const TaskStatus& status, reconcile.statuses()) {
+      const TaskID& taskId = status.task_id();
+
+      bool known = false;
+
+      // Try to locate the task.
+      if (framework != NULL) {
+        foreachkey (const ExecutorID& executorId, framework->pending) {
+          if (framework->pending[executorId].contains(taskId)) {
+            known = true;
+          }
+        }
+        foreachvalue (Executor* executor, framework->executors) {
+          if (executor->queuedTasks.contains(taskId) ||
+              executor->launchedTasks.contains(taskId) ||
+              executor->terminatedTasks.contains(taskId)) {
+            known = true;
+          }
+        }
+      }
+
+      // We only need to send a TASK_LOST update when the task is
+      // unknown (so that the master removes it). Otherwise, the
+      // master correctly holds the task and will receive updates.
+      if (!known) {
+        LOG(WARNING) << "Slave reconciling task " << taskId
+                     << " of framework " << reconcile.framework_id()
+                     << " in state TASK_LOST: task unknown to the slave";
+
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            reconcile.framework_id(),
+            info.id(),
+            taskId,
+            TASK_LOST,
+            "Reconciliation: task unknown to the slave");
+
+        // NOTE: We can't use statusUpdate() here because it drops
+        // updates for unknown frameworks.
+        statusUpdateManager->update(update, info.id())
+          .onAny(defer(self(),
+                       &Slave::__statusUpdate,
+                       lambda::_1,
+                       update,
+                       UPID()));
+      }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2869710..76d505c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -97,7 +97,12 @@ public:
   void shutdown(const process::UPID& from, const std::string& message);
 
   void registered(const process::UPID& from, const SlaveID& slaveId);
-  void reregistered(const process::UPID& from, const SlaveID& slaveId);
+
+  void reregistered(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const std::vector<ReconcileTasksMessage>& reconciliations);
+
   void doReliableRegistration(const Duration& duration);
 
   void runTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index e8f5322..c34a9d6 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -2233,9 +2233,9 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
 }
 
 
-// This test verifies that the master sends TASK_LOST updates
-// for tasks in the master absent from the re-registered slave.
-// We do this by dropping RunTaskMessage from master to the slave.
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we drop the
+// RunTaskMessage so the slave should send TASK_LOST.
 TEST_F(FaultToleranceTest, ReconcileLostTasks)
 {
   Try<PID<Master> > master = StartMaster();
@@ -2285,6 +2285,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
@@ -2295,6 +2298,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
 
   AWAIT_READY(slaveReregisteredMessage);
 
+  // Make sure the slave generated the TASK_LOST.
+  AWAIT_READY(statusUpdateMessage);
+
   AWAIT_READY(status);
 
   ASSERT_EQ(task.task_id(), status.get().task_id());


[3/3] git commit: Added a test for the Master <-> Slave reconciliation race.

Posted by bm...@apache.org.
Added a test for the Master <-> Slave reconciliation race.

Review: https://reviews.apache.org/r/26208


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c96ba8f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c96ba8f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c96ba8f6

Branch: refs/heads/master
Commit: c96ba8f6035329acebb25ca0f52215284bbf8f8f
Parents: cd03dfa
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 15:52:00 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:06 2014 -0700

----------------------------------------------------------------------
 src/tests/master_slave_reconciliation_tests.cpp | 119 +++++++++++++++++++
 1 file changed, 119 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c96ba8f6/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 0e703ab..04806ed 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -237,6 +237,125 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileLostTask)
 }
 
 
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we trigger
+// a race between the slave re-registration message and the launch
+// message. There should be no TASK_LOST.
+// This was motivated by MESOS-1696.
+TEST_F(MasterSlaveReconciliationTest, ReconcileRace)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Trigger a re-registration of the slave and capture the message
+  // so that we can spoof a race with a launch task message.
+  DROP_PROTOBUFS(ReregisterSlaveMessage(), slave.get(), master.get());
+
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    DROP_PROTOBUF(ReregisterSlaveMessage(), slave.get(), master.get());
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(reregisterSlaveMessage);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  ExecutorDriver* executorDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&executorDriver));
+
+  // Leave the task in TASK_STAGING.
+  Future<Nothing> launchTask;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureSatisfy(&launchTask));
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(launchTask);
+
+  // Send the stale re-registration message, which does not contain
+  // the task we just launched. This will trigger a reconciliation
+  // by the master.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Prevent this from being dropped per the DROP_PROTOBUFS above.
+  FUTURE_PROTOBUF(ReregisterSlaveMessage(), slave.get(), master.get());
+
+  process::post(slave.get(), master.get(), reregisterSlaveMessage.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Neither the master nor the slave should not send a TASK_LOST
+  // as part of the reconciliation. We check this by calling
+  // Clock::settle() to flush all pending events.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  // Now send TASK_FINISHED and make sure it's the only message
+  // received by the scheduler.
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  TaskStatus taskStatus;
+  taskStatus.mutable_task_id()->CopyFrom(task.task_id());
+  taskStatus.set_state(TASK_FINISHED);
+  executorDriver->sendStatusUpdate(taskStatus);
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
 // This test verifies that the slave reports pending tasks when
 // re-registering, otherwise the master will report them as being
 // lost.


[2/3] git commit: Split out partition and reconciliation tests from FaultTolerace tests.

Posted by bm...@apache.org.
Split out partition and reconciliation tests from FaultTolerace tests.

Review: https://reviews.apache.org/r/26207


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cd03dfa6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cd03dfa6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cd03dfa6

Branch: refs/heads/master
Commit: cd03dfa6e4022bc17fa9dd0599a17f391ce7978d
Parents: cc4444e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 12:18:28 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:06 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 src/tests/fault_tolerance_tests.cpp             | 769 +------------------
 src/tests/master_slave_reconciliation_tests.cpp | 401 ++++++++++
 src/tests/partition_tests.cpp                   | 363 ++++++++-
 4 files changed, 800 insertions(+), 734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fb12b3e..d503c8d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1166,6 +1166,7 @@ mesos_tests_SOURCES =				\
   tests/main.cpp				\
   tests/master_authorization_tests.cpp		\
   tests/master_contender_detector_tests.cpp	\
+  tests/master_slave_reconciliation_tests.cpp	\
   tests/master_tests.cpp			\
   tests/mesos.cpp				\
   tests/module_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index c34a9d6..a75910d 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 
-#include <stdint.h>
-#include <unistd.h>
-
 #include <gmock/gmock.h>
 
-#include <map>
-#include <string>
 #include <vector>
 
 #include <mesos/executor.hpp>
@@ -45,6 +40,7 @@
 #include "master/allocator.hpp"
 #include "master/master.hpp"
 
+#include "slave/constants.hpp"
 #include "slave/slave.hpp"
 
 #include "tests/containerizer.hpp"
@@ -58,7 +54,6 @@ using namespace mesos::internal::tests;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_MIN;
 
 using process::Clock;
 using process::Future;
@@ -70,15 +65,12 @@ using process::UPID;
 using process::http::OK;
 using process::http::Response;
 
-using std::string;
-using std::map;
 using std::vector;
 
 using testing::_;
 using testing::AnyOf;
 using testing::AtMost;
 using testing::DoAll;
-using testing::ElementsAre;
 using testing::Eq;
 using testing::Not;
 using testing::Return;
@@ -134,224 +126,7 @@ TEST_F(FaultToleranceTest, SlaveLost)
 }
 
 
-// This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
-TEST_F(FaultToleranceTest, PartitionedSlave)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  // Set these expectations up before we spawn the slave so that we
-  // don't miss the first PING.
-  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-
-  // Drop all the PONGs to simulate slave partition.
-  DROP_MESSAGES(Eq("PONG"), _, _);
-
-  Try<PID<Slave> > slave = StartSlave();
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<Nothing> resourceOffers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureSatisfy(&resourceOffers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  // Need to make sure the framework AND slave have registered with
-  // master. Waiting for resource offers should accomplish both.
-  AWAIT_READY(resourceOffers);
-
-  Clock::pause();
-
-  EXPECT_CALL(sched, offerRescinded(&driver, _))
-    .Times(AtMost(1));
-
-  Future<Nothing> slaveLost;
-  EXPECT_CALL(sched, slaveLost(&driver, _))
-    .WillOnce(FutureSatisfy(&slaveLost));
-
-  // Now advance through the PINGs.
-  uint32_t pings = 0;
-  while (true) {
-    AWAIT_READY(ping);
-    pings++;
-    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
-     break;
-    }
-    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-    Clock::advance(master::SLAVE_PING_TIMEOUT);
-  }
-
-  Clock::advance(master::SLAVE_PING_TIMEOUT);
-
-  AWAIT_READY(slaveLost);
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-
-  Clock::resume();
-}
-
-
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to re-register, we deny the
-// re-registration by sending a ShutdownMessage to the slave.
-// Why? Because during a network partition, the master will remove a
-// partitioned slave, thus sending its tasks to LOST. At this point,
-// when the partition is removed, the slave will attempt to
-// re-register with its running tasks. We've already notified
-// frameworks that these tasks were LOST, so we have to have the slave
-// slave shut down.
-TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  // Allow the master to PING the slave, but drop all PONG messages
-  // from the slave. Note that we don't match on the master / slave
-  // PIDs because it's actually the SlaveObserver Process that sends
-  // the pings.
-  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-  DROP_MESSAGES(Eq("PONG"), _, _);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
-  StandaloneMasterDetector detector(master.get());
-
-  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_NE(0u, offers.get().size());
-
-  // Launch a task. This is to ensure the task is killed by the slave,
-  // during shutdown.
-  TaskID taskId;
-  taskId.set_value("1");
-
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->MergeFrom(taskId);
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-  task.mutable_executor()->mutable_command()->set_value("sleep 60");
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  // Set up the expectations for launching the task.
-  EXPECT_CALL(exec, registered(_, _, _, _));
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> runningStatus;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&runningStatus));
-
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
-      slave.get(), &Slave::_statusUpdateAcknowledgement);
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  AWAIT_READY(runningStatus);
-  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
-
-  // Wait for the slave to have handled the acknowledgment prior
-  // to pausing the clock.
-  AWAIT_READY(statusUpdateAck);
-
-  // Drop the first shutdown message from the master (simulated
-  // partition), allow the second shutdown message to pass when
-  // the slave re-registers.
-  Future<ShutdownMessage> shutdownMessage =
-    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
-
-  Future<TaskStatus> lostStatus;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&lostStatus));
-
-  Future<Nothing> slaveLost;
-  EXPECT_CALL(sched, slaveLost(&driver, _))
-    .WillOnce(FutureSatisfy(&slaveLost));
-
-  Clock::pause();
-
-  // Now, induce a partition of the slave by having the master
-  // timeout the slave.
-  uint32_t pings = 0;
-  while (true) {
-    AWAIT_READY(ping);
-    pings++;
-    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
-     break;
-    }
-    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-    Clock::advance(master::SLAVE_PING_TIMEOUT);
-    Clock::settle();
-  }
-
-  Clock::advance(master::SLAVE_PING_TIMEOUT);
-  Clock::settle();
-
-  // The master will have notified the framework of the lost task.
-  AWAIT_READY(lostStatus);
-  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
-
-  // Wait for the master to attempt to shut down the slave.
-  AWAIT_READY(shutdownMessage);
-
-  // The master will notify the framework that the slave was lost.
-  AWAIT_READY(slaveLost);
-
-  Clock::resume();
-
-  // We now complete the partition on the slave side as well. This
-  // is done by simulating a master loss event which would normally
-  // occur during a network partition.
-  detector.appoint(None());
-
-  Future<Nothing> shutdown;
-  EXPECT_CALL(exec, shutdown(_))
-    .WillOnce(FutureSatisfy(&shutdown));
-
-  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
-
-  // Have the slave re-register with the master.
-  detector.appoint(master.get());
-
-  // Upon re-registration, the master will shutdown the slave.
-  // The slave will then shut down the executor.
-  AWAIT_READY(shutdownMessage);
-  AWAIT_READY(shutdown);
-
-  driver.stop();
-  driver.join();
 
-  Shutdown();
-}
 
 
 // The purpose of this test is to ensure that when slaves are removed
@@ -465,196 +240,60 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
 }
 
 
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to send exited executor messages,
-// we send a ShutdownMessage to the slave. Why? Because during a
-// network partition, the master will remove a partitioned slave, thus
-// sending its tasks to LOST. At this point, when the partition is
-// removed, the slave may attempt to send exited executor messages if
-// it was unaware that the master removed it. We've already
-// notified frameworks that the tasks under the executors were LOST,
-// so we have to have the slave shut down.
-TEST_F(FaultToleranceTest, PartitionedSlaveExitedExecutor)
+// This test ensures that a framework connecting with a
+// failed over master gets a registered callback.
+// Note that this behavior might change in the future and
+// the scheduler might receive a re-registered callback.
+TEST_F(FaultToleranceTest, MasterFailover)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  // Allow the master to PING the slave, but drop all PONG messages
-  // from the slave. Note that we don't match on the master / slave
-  // PIDs because it's actually the SlaveObserver Process that sends
-  // the pings.
-  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-  DROP_MESSAGES(Eq("PONG"), _, _);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Try<PID<Slave> > slave = StartSlave(&containerizer);
-  ASSERT_SOME(slave);
-
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector);
 
-  Future<FrameworkID> frameworkId;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureArg<1>(&frameworkId));\
+  Future<process::Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
 
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
+  Future<Nothing> registered1;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered1));
 
   driver.start();
 
-  AWAIT_READY(frameworkId);
-  AWAIT_READY(offers);
-  ASSERT_NE(0u, offers.get().size());
+  AWAIT_READY(frameworkRegisteredMessage);
 
-  // Launch a task. This allows us to have the slave send an
-  // ExitedExecutorMessage.
-  TaskID taskId;
-  taskId.set_value("1");
+  AWAIT_READY(registered1);
 
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->MergeFrom(taskId);
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+  // Simulate failed over master by restarting the master.
+  Stop(master.get());
+  master = StartMaster();
+  ASSERT_SOME(master);
 
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
+  EXPECT_CALL(sched, disconnected(&driver));
 
-  // Set up the expectations for launching the task.
-  EXPECT_CALL(exec, registered(_, _, _, _));
+  Future<AuthenticateMessage> authenticateMessage =
+    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
 
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+  Future<Nothing> registered2;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered2));
 
-  // Drop all the status updates from the slave, so that we can
-  // ensure the ExitedExecutorMessage is what triggers the slave
-  // shutdown.
-  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+  // Simulate a new master detected message to the scheduler.
+  detector.appoint(master.get());
 
-  driver.launchTasks(offers.get()[0].id(), tasks);
+  // Scheduler should retry authentication.
+  AWAIT_READY(authenticateMessage);
 
-  // Drop the first shutdown message from the master (simulated
-  // partition) and allow the second shutdown message to pass when
-  // triggered by the ExitedExecutorMessage.
-  Future<ShutdownMessage> shutdownMessage =
-    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+  // Framework should get a registered callback.
+  AWAIT_READY(registered2);
 
-  Future<TaskStatus> lostStatus;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&lostStatus));
+  driver.stop();
+  driver.join();
 
-  Future<Nothing> slaveLost;
-  EXPECT_CALL(sched, slaveLost(&driver, _))
-    .WillOnce(FutureSatisfy(&slaveLost));
-
-  Clock::pause();
-
-  // Now, induce a partition of the slave by having the master
-  // timeout the slave.
-  uint32_t pings = 0;
-  while (true) {
-    AWAIT_READY(ping);
-    pings++;
-    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
-     break;
-    }
-    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-    Clock::advance(master::SLAVE_PING_TIMEOUT);
-    Clock::settle();
-  }
-
-  Clock::advance(master::SLAVE_PING_TIMEOUT);
-  Clock::settle();
-
-  // The master will have notified the framework of the lost task.
-  AWAIT_READY(lostStatus);
-  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
-
-  // Wait for the master to attempt to shut down the slave.
-  AWAIT_READY(shutdownMessage);
-
-  // The master will notify the framework that the slave was lost.
-  AWAIT_READY(slaveLost);
-
-  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
-
-  // Induce an ExitedExecutorMessage from the slave.
-  containerizer.destroy(
-      frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
-
-  // Upon receiving the message, the master will shutdown the slave.
-  AWAIT_READY(shutdownMessage);
-
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This test ensures that a framework connecting with a
-// failed over master gets a registered callback.
-// Note that this behavior might change in the future and
-// the scheduler might receive a re-registered callback.
-TEST_F(FaultToleranceTest, MasterFailover)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockScheduler sched;
-  StandaloneMasterDetector detector(master.get());
-  TestingMesosSchedulerDriver driver(&sched, &detector);
-
-  Future<process::Message> frameworkRegisteredMessage =
-    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
-
-  Future<Nothing> registered1;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered1));
-
-  driver.start();
-
-  AWAIT_READY(frameworkRegisteredMessage);
-
-  AWAIT_READY(registered1);
-
-  // Simulate failed over master by restarting the master.
-  Stop(master.get());
-  master = StartMaster();
-  ASSERT_SOME(master);
-
-  EXPECT_CALL(sched, disconnected(&driver));
-
-  Future<AuthenticateMessage> authenticateMessage =
-    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
-
-  Future<Nothing> registered2;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered2));
-
-  // Simulate a new master detected message to the scheduler.
-  detector.appoint(master.get());
-
-  // Scheduler should retry authentication.
-  AWAIT_READY(authenticateMessage);
-
-  // Framework should get a registered callback.
-  AWAIT_READY(registered2);
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
+  Shutdown();
+}
 
 
 // This test ensures that a failed over master recovers completed tasks
@@ -1454,7 +1093,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
 
   // Now advance time enough for the reliable timeout
   // to kick in and another status update is sent.
-  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
 
   AWAIT_READY(statusUpdate);
 
@@ -2045,98 +1684,6 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
 }
 
 
-// This test verifies that a re-registering slave sends the terminal
-// unacknowledged tasks for a terminal executor. This is required
-// for the master to correctly reconcile it's view with the slave's
-// view of tasks. This test drops a terminal update to the master
-// and then forces the slave to re-register.
-TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  StandaloneMasterDetector detector(master.get());
-
-  Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<FrameworkID> frameworkId;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureArg<1>(&frameworkId));
-
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
-    = FUTURE_PROTOBUF(
-        StatusUpdateAcknowledgementMessage(), master.get(), slave.get());
-
-  driver.start();
-
-  AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status.get().state());
-
-  // Make sure the acknowledgement reaches the slave.
-  AWAIT_READY(statusUpdateAcknowledgementMessage);
-
-  // Drop the TASK_FINISHED status update sent to the master.
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
-  Future<ExitedExecutorMessage> executorExitedMessage =
-    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
-
-  TaskStatus finishedStatus;
-  finishedStatus = status.get();
-  finishedStatus.set_state(TASK_FINISHED);
-  execDriver->sendStatusUpdate(finishedStatus);
-
-  // Ensure the update was sent.
-  AWAIT_READY(statusUpdateMessage);
-
-  // Now kill the executor.
-  containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
-
-  Future<TaskStatus> status2;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status2));
-
-  // We drop the 'UpdateFrameworkMessage' from the master to slave to
-  // stop the status update manager from retrying the update that was
-  // already sent due to the new master detection.
-  DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _);
-
-  detector.appoint(master.get());
-
-  AWAIT_READY(status2);
-  EXPECT_EQ(TASK_FINISHED, status2.get().state());
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
 // This test ensures that a master properly handles the
 // re-registration of a framework when an empty executor is present
 // on a slave. This was added to prevent regressions on MESOS-1821.
@@ -2233,250 +1780,6 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
 }
 
 
-// This test verifies that the master reconciles tasks that are
-// missing from a re-registering slave. In this case, we drop the
-// RunTaskMessage so the slave should send TASK_LOST.
-TEST_F(FaultToleranceTest, ReconcileLostTasks)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  StandaloneMasterDetector detector(master.get());
-
-  Try<PID<Slave> > slave = StartSlave(&detector);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-
-  EXPECT_NE(0u, offers.get().size());
-
-  TaskInfo task;
-  task.set_name("test task");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  // We now launch a task and drop the corresponding RunTaskMessage on
-  // the slave, to ensure that only the master knows about this task.
-  Future<RunTaskMessage> runTaskMessage =
-    DROP_PROTOBUF(RunTaskMessage(), _, _);
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  AWAIT_READY(runTaskMessage);
-
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Simulate a spurious master change event (e.g., due to ZooKeeper
-  // expiration) at the slave to force re-registration.
-  detector.appoint(master.get());
-
-  AWAIT_READY(slaveReregisteredMessage);
-
-  // Make sure the slave generated the TASK_LOST.
-  AWAIT_READY(statusUpdateMessage);
-
-  AWAIT_READY(status);
-
-  ASSERT_EQ(task.task_id(), status.get().task_id());
-  ASSERT_EQ(TASK_LOST, status.get().state());
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This test verifies that the slave reports pending tasks when
-// re-registering, otherwise the master will report them as being
-// lost.
-TEST_F(FaultToleranceTest, ReconcilePendingTasks)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  StandaloneMasterDetector detector(master.get());
-
-  Try<PID<Slave> > slave = StartSlave(&detector);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  // No TASK_LOST updates should occur!
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  // We drop the _runTask dispatch to ensure the task remains
-  // pending in the slave.
-  Future<Nothing> _runTask = DROP_DISPATCH(slave.get(), &Slave::_runTask);
-
-  TaskInfo task1;
-  task1.set_name("test task");
-  task1.mutable_task_id()->set_value("1");
-  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task1.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  vector<TaskInfo> tasks1;
-  tasks1.push_back(task1);
-
-  driver.launchTasks(offers.get()[0].id(), tasks1);
-
-  AWAIT_READY(_runTask);
-
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
-  // Simulate a spurious master change event (e.g., due to ZooKeeper
-  // expiration) at the slave to force re-registration.
-  detector.appoint(master.get());
-
-  AWAIT_READY(slaveReregisteredMessage);
-
-  Clock::pause();
-  Clock::settle();
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
-// This test verifies that when the slave re-registers, the master
-// does not send TASK_LOST update for a task that has reached terminal
-// state but is waiting for an acknowledgement.
-TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
-  StandaloneMasterDetector detector(master.get());
-
-  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  TaskInfo task;
-  task.set_name("test task");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  EXPECT_CALL(exec, registered(_, _, _, _));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  // Drop the status update from slave to the master, so that
-  // the slave has a pending terminal update when it re-registers.
-  DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
-  Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status))
-    .WillRepeatedly(Return()); // Ignore retried update due to update framework.
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  AWAIT_READY(_statusUpdate);
-
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
-  // Simulate a spurious master change event (e.g., due to ZooKeeper
-  // expiration) at the slave to force re-registration.
-  detector.appoint(master.get());
-
-  AWAIT_READY(slaveReregisteredMessage);
-
-  // The master should not send a TASK_LOST after the slave
-  // re-registers. We check this by calling Clock::settle() so that
-  // the only update the scheduler receives is the retried
-  // TASK_FINISHED update.
-  // NOTE: The status update manager resends the status update when
-  // it detects a new master.
-  Clock::pause();
-  Clock::settle();
-
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_FINISHED, status.get().state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
-
-
 // This test ensures that if a master incorrectly thinks that it is
 // leading, the scheduler driver will drop messages from this master.
 // Unfortunately, it is not currently possible to start more than one

http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
new file mode 100644
index 0000000..0e703ab
--- /dev/null
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "master/allocator.hpp"
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::protobuf;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::Message;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+using testing::SaveArg;
+
+
+class MasterSlaveReconciliationTest : public MesosTest {};
+
+
+// This test verifies that a re-registering slave sends the terminal
+// unacknowledged tasks for a terminal executor. This is required
+// for the master to correctly reconcile it's view with the slave's
+// view of tasks. This test drops a terminal update to the master
+// and then forces the slave to re-register.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminatedExecutor)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
+    = FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(), master.get(), slave.get());
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Make sure the acknowledgement reaches the slave.
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  // Drop the TASK_FINISHED status update sent to the master.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<ExitedExecutorMessage> executorExitedMessage =
+    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+
+  TaskStatus finishedStatus;
+  finishedStatus = status.get();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure the update was sent.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Now kill the executor.
+  containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  // We drop the 'UpdateFrameworkMessage' from the master to slave to
+  // stop the status update manager from retrying the update that was
+  // already sent due to the new master detection.
+  DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _);
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we drop the
+// RunTaskMessage so the slave should send TASK_LOST.
+TEST_F(MasterSlaveReconciliationTest, ReconcileLostTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // We now launch a task and drop the corresponding RunTaskMessage on
+  // the slave, to ensure that only the master knows about this task.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(runTaskMessage);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Make sure the slave generated the TASK_LOST.
+  AWAIT_READY(statusUpdateMessage);
+
+  AWAIT_READY(status);
+
+  ASSERT_EQ(task.task_id(), status.get().task_id());
+  ASSERT_EQ(TASK_LOST, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the slave reports pending tasks when
+// re-registering, otherwise the master will report them as being
+// lost.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterPendingTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // No TASK_LOST updates should occur!
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
+
+  // We drop the _runTask dispatch to ensure the task remains
+  // pending in the slave.
+  Future<Nothing> _runTask = DROP_DISPATCH(slave.get(), &Slave::_runTask);
+
+  TaskInfo task1;
+  task1.set_name("test task");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task1.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks1;
+  tasks1.push_back(task1);
+
+  driver.launchTasks(offers.get()[0].id(), tasks1);
+
+  AWAIT_READY(_runTask);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that when the slave re-registers, the master
+// does not send TASK_LOST update for a task that has reached terminal
+// state but is waiting for an acknowledgement.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminalTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  // Drop the status update from slave to the master, so that
+  // the slave has a pending terminal update when it re-registers.
+  DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore retried update due to update framework.
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(_statusUpdate);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The master should not send a TASK_LOST after the slave
+  // re-registers. We check this by calling Clock::settle() so that
+  // the only update the scheduler receives is the retried
+  // TASK_FINISHED update.
+  // NOTE: The status update manager resends the status update when
+  // it detects a new master.
+  Clock::pause();
+  Clock::settle();
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 8136a95..0dc1a92 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -18,7 +18,7 @@
 
 #include <gmock/gmock.h>
 
-#include <string>
+#include <vector>
 
 #include <process/clock.hpp>
 #include <process/future.hpp>
@@ -33,6 +33,7 @@
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
@@ -51,13 +52,373 @@ using process::Future;
 using process::Message;
 using process::PID;
 
+using std::vector;
+
 using testing::_;
+using testing::AtMost;
 using testing::Eq;
+using testing::Return;
 
 
 class PartitionTest : public MesosTest {};
 
 
+// This test checks that a scheduler gets a slave lost
+// message for a partioned slave.
+TEST_F(PartitionTest, PartitionedSlave)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Drop all the PONGs to simulate slave partition.
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<Nothing> resourceOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureSatisfy(&resourceOffers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Need to make sure the framework AND slave have registered with
+  // master. Waiting for resource offers should accomplish both.
+  AWAIT_READY(resourceOffers);
+
+  Clock::pause();
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  // Now advance through the PINGs.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+
+  AWAIT_READY(slaveLost);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  Clock::resume();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to re-register, we deny the
+// re-registration by sending a ShutdownMessage to the slave.
+// Why? Because during a network partition, the master will remove a
+// partitioned slave, thus sending its tasks to LOST. At this point,
+// when the partition is removed, the slave will attempt to
+// re-register with its running tasks. We've already notified
+// frameworks that these tasks were LOST, so we have to have the slave
+// slave shut down.
+TEST_F(PartitionTest, PartitionedSlaveReregistration)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  // Launch a task. This is to ensure the task is killed by the slave,
+  // during shutdown.
+  TaskID taskId;
+  taskId.set_value("1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->MergeFrom(taskId);
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Set up the expectations for launching the task.
+  EXPECT_CALL(exec, registered(_, _, _, _));
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+      slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+
+  // Wait for the slave to have handled the acknowledgment prior
+  // to pausing the clock.
+  AWAIT_READY(statusUpdateAck);
+
+  // Drop the first shutdown message from the master (simulated
+  // partition), allow the second shutdown message to pass when
+  // the slave re-registers.
+  Future<ShutdownMessage> shutdownMessage =
+    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+    Clock::settle();
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // The master will have notified the framework of the lost task.
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+  // Wait for the master to attempt to shut down the slave.
+  AWAIT_READY(shutdownMessage);
+
+  // The master will notify the framework that the slave was lost.
+  AWAIT_READY(slaveLost);
+
+  Clock::resume();
+
+  // We now complete the partition on the slave side as well. This
+  // is done by simulating a master loss event which would normally
+  // occur during a network partition.
+  detector.appoint(None());
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  // Have the slave re-register with the master.
+  detector.appoint(master.get());
+
+  // Upon re-registration, the master will shutdown the slave.
+  // The slave will then shut down the executor.
+  AWAIT_READY(shutdownMessage);
+  AWAIT_READY(shutdown);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send exited executor messages,
+// we send a ShutdownMessage to the slave. Why? Because during a
+// network partition, the master will remove a partitioned slave, thus
+// sending its tasks to LOST. At this point, when the partition is
+// removed, the slave may attempt to send exited executor messages if
+// it was unaware that the master removed it. We've already
+// notified frameworks that the tasks under the executors were LOST,
+// so we have to have the slave shut down.
+TEST_F(PartitionTest, PartitionedSlaveExitedExecutor)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));\
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  // Launch a task. This allows us to have the slave send an
+  // ExitedExecutorMessage.
+  TaskID taskId;
+  taskId.set_value("1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->MergeFrom(taskId);
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Set up the expectations for launching the task.
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Drop all the status updates from the slave, so that we can
+  // ensure the ExitedExecutorMessage is what triggers the slave
+  // shutdown.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Drop the first shutdown message from the master (simulated
+  // partition) and allow the second shutdown message to pass when
+  // triggered by the ExitedExecutorMessage.
+  Future<ShutdownMessage> shutdownMessage =
+    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+    Clock::settle();
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // The master will have notified the framework of the lost task.
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+  // Wait for the master to attempt to shut down the slave.
+  AWAIT_READY(shutdownMessage);
+
+  // The master will notify the framework that the slave was lost.
+  AWAIT_READY(slaveLost);
+
+  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  // Induce an ExitedExecutorMessage from the slave.
+  containerizer.destroy(
+      frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
+
+  // Upon receiving the message, the master will shutdown the slave.
+  AWAIT_READY(shutdownMessage);
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
 // This test verifies that if master --> slave socket closes and the
 // slave is not aware of it (i.e., one way network partition), slave
 // will re-register with the master.