You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/11/21 00:54:20 UTC

[5/6] mesos git commit: Added the "task" prefix to the name of the status update manager files.

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
deleted file mode 100644
index d180e8c..0000000
--- a/src/tests/status_update_manager_tests.cpp
+++ /dev/null
@@ -1,852 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <string>
-#include <vector>
-
-#include <gmock/gmock.h>
-
-#include <mesos/executor.hpp>
-#include <mesos/scheduler.hpp>
-
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/gmock.hpp>
-#include <process/owned.hpp>
-#include <process/pid.hpp>
-
-#include <stout/none.hpp>
-#include <stout/result.hpp>
-#include <stout/try.hpp>
-
-#include "master/master.hpp"
-
-#include "slave/constants.hpp"
-#include "slave/paths.hpp"
-#include "slave/slave.hpp"
-#include "slave/state.hpp"
-
-#include "messages/messages.hpp"
-
-#include "tests/containerizer.hpp"
-#include "tests/mesos.hpp"
-
-using mesos::internal::master::Master;
-
-using mesos::internal::slave::Slave;
-
-using mesos::master::detector::MasterDetector;
-
-using process::Clock;
-using process::Future;
-using process::Owned;
-using process::PID;
-
-using std::string;
-using std::vector;
-
-using testing::_;
-using testing::AtMost;
-using testing::Return;
-using testing::SaveArg;
-
-namespace mesos {
-namespace internal {
-namespace tests {
-
-
-// TODO(benh): Move this into utils, make more generic, and use in
-// other tests.
-vector<TaskInfo> createTasks(const Offer& offer)
-{
-  TaskInfo task;
-  task.set_name("test-task");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offer.slave_id());
-  task.mutable_resources()->MergeFrom(offer.resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  return tasks;
-}
-
-
-class TaskStatusUpdateManagerTest: public MesosTest {};
-
-
-TEST_F_TEMP_DISABLED_ON_WINDOWS(
-    TaskStatusUpdateManagerTest, CheckpointStatusUpdate)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  // Require flags to retrieve work_dir when recovering
-  // the checkpointed data.
-  slave::Flags flags = CreateSlaveFlags();
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &containerizer, flags);
-  ASSERT_SOME(slave);
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  Future<FrameworkID> frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(FutureArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(frameworkId);
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  EXPECT_CALL(exec, registered(_, _, _, _));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
-
-  AWAIT_READY(_statusUpdateAcknowledgement);
-
-  // Ensure that both the status update and its acknowledgement are
-  // correctly checkpointed.
-  Result<slave::state::State> state =
-    slave::state::recover(slave::paths::getMetaRootDir(flags.work_dir), true);
-
-  ASSERT_SOME(state);
-  ASSERT_SOME(state->slave);
-  ASSERT_TRUE(state->slave->frameworks.contains(frameworkId.get()));
-
-  slave::state::FrameworkState frameworkState =
-    state->slave->frameworks.get(frameworkId.get()).get();
-
-  ASSERT_EQ(1u, frameworkState.executors.size());
-
-  slave::state::ExecutorState executorState =
-    frameworkState.executors.begin()->second;
-
-  ASSERT_EQ(1u, executorState.runs.size());
-
-  slave::state::RunState runState = executorState.runs.begin()->second;
-
-  ASSERT_EQ(1u, runState.tasks.size());
-
-  slave::state::TaskState taskState = runState.tasks.begin()->second;
-
-  EXPECT_EQ(1u, taskState.updates.size());
-  EXPECT_EQ(1u, taskState.acks.size());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-TEST_F(TaskStatusUpdateManagerTest, RetryStatusUpdate)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  slave::Flags flags = CreateSlaveFlags();
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &containerizer, flags);
-  ASSERT_SOME(slave);
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(_, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  EXPECT_CALL(exec, registered(_, _, _, _));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    DROP_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(statusUpdateMessage);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_RUNNING, status->state());
-
-  Clock::resume();
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that status update manager ignores
-// duplicate ACK for an earlier update when it is waiting
-// for an ACK for a later update. This could happen when the
-// duplicate ACK is for a retried update.
-TEST_F(TaskStatusUpdateManagerTest, IgnoreDuplicateStatusUpdateAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-      .WillOnce(SaveArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  // Drop the first update, so that status update manager
-  // resends the update.
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    DROP_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(statusUpdateMessage);
-  StatusUpdate update = statusUpdateMessage->update();
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // This is the ACK for the retried update.
-  Future<Nothing> ack =
-    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_RUNNING, status->state());
-
-  AWAIT_READY(ack);
-
-  // Now send TASK_FINISHED update so that the status update manager
-  // is waiting for its ACK, which it never gets because we drop the
-  // update.
-  DROP_PROTOBUFS(StatusUpdateMessage(), master.get()->pid, _);
-
-  Future<Nothing> update2 = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
-
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_FINISHED);
-
-  execDriver->sendStatusUpdate(status2);
-
-  AWAIT_READY(update2);
-
-  // This is to catch the duplicate ack for TASK_RUNNING.
-  Future<Nothing> duplicateAck =
-      FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
-
-  // Now send a duplicate ACK for the TASK_RUNNING update.
-  process::dispatch(
-      slave.get()->pid,
-      &Slave::statusUpdateAcknowledgement,
-      master.get()->pid,
-      update.slave_id(),
-      frameworkId,
-      update.status().task_id(),
-      update.uuid());
-
-  AWAIT_READY(duplicateAck);
-
-  Clock::resume();
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that status update manager ignores
-// unexpected ACK for an earlier update when it is waiting
-// for an ACK for another update. We do this by dropping ACKs
-// for the original update and sending a random ACK to the slave.
-TEST_F(TaskStatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-      .WillOnce(SaveArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    FUTURE_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
-
-  // Drop the ACKs, so that status update manager
-  // retries the update.
-  DROP_CALLS(mesos::scheduler::Call(),
-             mesos::scheduler::Call::ACKNOWLEDGE,
-             _,
-             master.get()->pid);
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(statusUpdateMessage);
-  StatusUpdate update = statusUpdateMessage->update();
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_RUNNING, status->state());
-
-  Future<Nothing> unexpectedAck =
-      FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
-
-  // Now send an ACK with a random UUID.
-  process::dispatch(
-      slave.get()->pid,
-      &Slave::statusUpdateAcknowledgement,
-      master.get()->pid,
-      update.slave_id(),
-      frameworkId,
-      update.status().task_id(),
-      UUID::random().toBytes());
-
-  AWAIT_READY(unexpectedAck);
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate terminal status updates, when the
-// second update is received after the ACK for the first update.
-// The proper behavior here is for the status update manager to
-// forward the duplicate update to the scheduler.
-TEST_F(TaskStatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  slave::Flags flags = CreateSlaveFlags();
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &containerizer, flags);
-  ASSERT_SOME(slave);
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_FINISHED, status->state());
-
-  AWAIT_READY(_statusUpdateAcknowledgement);
-
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Future<Nothing> _statusUpdateAcknowledgement2 =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  Clock::pause();
-
-  // Now send a TASK_KILLED update for the same task.
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_KILLED);
-  execDriver->sendStatusUpdate(status2);
-
-  // Ensure the scheduler receives TASK_KILLED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_KILLED, update->state());
-
-  // Ensure the slave properly handles the ACK.
-  // Clock::settle() ensures that the slave successfully
-  // executes Slave::_statusUpdateAcknowledgement().
-  AWAIT_READY(_statusUpdateAcknowledgement2);
-  Clock::settle();
-
-  Clock::resume();
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate status updates, when the second
-// update with the same UUID is received before the ACK for the
-// first update. The proper behavior here is for the status update
-// manager to drop the duplicate update.
-TEST_F(TaskStatusUpdateManagerTest, DuplicateUpdateBeforeAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  // Capture the first status update message.
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Drop the first ACK from the scheduler to the slave.
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(statusUpdateMessage);
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_RUNNING, status->state());
-
-  AWAIT_READY(statusUpdateAckMessage);
-
-  Future<Nothing> ___statusUpdate =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  // Now resend the TASK_RUNNING update.
-  process::post(slave.get()->pid, statusUpdateMessage.get());
-
-  // At this point the status update manager has handled
-  // the duplicate status update.
-  AWAIT_READY(___statusUpdate);
-
-  // After we advance the clock, the status update manager should
-  // retry the TASK_RUNNING update and the scheduler should receive
-  // and acknowledge it.
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  Clock::settle();
-
-  // Ensure the scheduler receives TASK_FINISHED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_RUNNING, update->state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the status update manager correctly includes
-// the latest state of the task in status update.
-TEST_F(TaskStatusUpdateManagerTest, LatestTaskState)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(_, _, _));
-
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  // Signal when the first update is dropped.
-  Future<StatusUpdateMessage> statusUpdateMessage =
-    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
-
-  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
-
-  driver.start();
-
-  // Wait until TASK_RUNNING is sent to the master.
-  AWAIT_READY(statusUpdateMessage);
-
-  // Ensure the status update manager handles the TASK_RUNNING update.
-  AWAIT_READY(___statusUpdate);
-
-  // Pause the clock to avoid status update manager from retrying.
-  Clock::pause();
-
-  Future<Nothing> ___statusUpdate2 =
-    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
-
-  // Now send TASK_FINISHED update.
-  TaskStatus finishedStatus;
-  finishedStatus = statusUpdateMessage->update().status();
-  finishedStatus.set_state(TASK_FINISHED);
-  execDriver->sendStatusUpdate(finishedStatus);
-
-  // Ensure the status update manager handles the TASK_FINISHED update.
-  AWAIT_READY(___statusUpdate2);
-
-  // Signal when the second update is dropped.
-  Future<StatusUpdateMessage> statusUpdateMessage2 =
-    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
-
-  // Advance the clock for the status update manager to send a retry.
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-
-  AWAIT_READY(statusUpdateMessage2);
-
-  // The update should correspond to TASK_RUNNING.
-  ASSERT_EQ(TASK_RUNNING, statusUpdateMessage2->update().status().state());
-
-  // The update should include TASK_FINISHED as the latest state.
-  ASSERT_EQ(TASK_FINISHED,
-            statusUpdateMessage2->update().latest_state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that if master receives a status update
-// for an already terminated task it forwards it without
-// changing the state of the task.
-TEST_F(TaskStatusUpdateManagerTest, DuplicatedTerminalStatusUpdate)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_FINISHED, status->state());
-
-  AWAIT_READY(_statusUpdateAcknowledgement);
-
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Future<Nothing> _statusUpdateAcknowledgement2 =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  Clock::pause();
-
-  // Now send a TASK_KILLED update for the same task.
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_KILLED);
-  execDriver->sendStatusUpdate(status2);
-
-  // Ensure the scheduler receives TASK_KILLED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_KILLED, update->state());
-
-  // Ensure the slave properly handles the ACK.
-  // Clock::settle() ensures that the slave successfully
-  // executes Slave::_statusUpdateAcknowledgement().
-  AWAIT_READY(_statusUpdateAcknowledgement2);
-
-  // Verify the latest task status.
-  Future<process::http::Response> tasks = process::http::get(
-      master.get()->pid,
-      "tasks",
-      None(),
-      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, tasks);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", tasks);
-
-  Try<JSON::Object> parse = JSON::parse<JSON::Object>(tasks->body);
-  ASSERT_SOME(parse);
-
-  Result<JSON::String> state = parse->find<JSON::String>("tasks[0].state");
-
-  ASSERT_SOME_EQ(JSON::String("TASK_FINISHED"), state);
-
-  Clock::resume();
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-} // namespace tests {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/task_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/task_status_update_manager_tests.cpp b/src/tests/task_status_update_manager_tests.cpp
new file mode 100644
index 0000000..d180e8c
--- /dev/null
+++ b/src/tests/task_status_update_manager_tests.cpp
@@ -0,0 +1,852 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+
+#include <stout/none.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::PID;
+
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+using testing::SaveArg;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+
+// TODO(benh): Move this into utils, make more generic, and use in
+// other tests.
+vector<TaskInfo> createTasks(const Offer& offer)
+{
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offer.slave_id());
+  task.mutable_resources()->MergeFrom(offer.resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  return tasks;
+}
+
+
+class TaskStatusUpdateManagerTest: public MesosTest {};
+
+
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    TaskStatusUpdateManagerTest, CheckpointStatusUpdate)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  // Require flags to retrieve work_dir when recovering
+  // the checkpointed data.
+  slave::Flags flags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, flags);
+  ASSERT_SOME(slave);
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  // Ensure that both the status update and its acknowledgement are
+  // correctly checkpointed.
+  Result<slave::state::State> state =
+    slave::state::recover(slave::paths::getMetaRootDir(flags.work_dir), true);
+
+  ASSERT_SOME(state);
+  ASSERT_SOME(state->slave);
+  ASSERT_TRUE(state->slave->frameworks.contains(frameworkId.get()));
+
+  slave::state::FrameworkState frameworkState =
+    state->slave->frameworks.get(frameworkId.get()).get();
+
+  ASSERT_EQ(1u, frameworkState.executors.size());
+
+  slave::state::ExecutorState executorState =
+    frameworkState.executors.begin()->second;
+
+  ASSERT_EQ(1u, executorState.runs.size());
+
+  slave::state::RunState runState = executorState.runs.begin()->second;
+
+  ASSERT_EQ(1u, runState.tasks.size());
+
+  slave::state::TaskState taskState = runState.tasks.begin()->second;
+
+  EXPECT_EQ(1u, taskState.updates.size());
+  EXPECT_EQ(1u, taskState.acks.size());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+TEST_F(TaskStatusUpdateManagerTest, RetryStatusUpdate)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, flags);
+  ASSERT_SOME(slave);
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that status update manager ignores
+// duplicate ACK for an earlier update when it is waiting
+// for an ACK for a later update. This could happen when the
+// duplicate ACK is for a retried update.
+TEST_F(TaskStatusUpdateManagerTest, IgnoreDuplicateStatusUpdateAck)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+      .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Drop the first update, so that status update manager
+  // resends the update.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+  StatusUpdate update = statusUpdateMessage->update();
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // This is the ACK for the retried update.
+  Future<Nothing> ack =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  AWAIT_READY(ack);
+
+  // Now send TASK_FINISHED update so that the status update manager
+  // is waiting for its ACK, which it never gets because we drop the
+  // update.
+  DROP_PROTOBUFS(StatusUpdateMessage(), master.get()->pid, _);
+
+  Future<Nothing> update2 = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_FINISHED);
+
+  execDriver->sendStatusUpdate(status2);
+
+  AWAIT_READY(update2);
+
+  // This is to catch the duplicate ack for TASK_RUNNING.
+  Future<Nothing> duplicateAck =
+      FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  // Now send a duplicate ACK for the TASK_RUNNING update.
+  process::dispatch(
+      slave.get()->pid,
+      &Slave::statusUpdateAcknowledgement,
+      master.get()->pid,
+      update.slave_id(),
+      frameworkId,
+      update.status().task_id(),
+      update.uuid());
+
+  AWAIT_READY(duplicateAck);
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that status update manager ignores
+// unexpected ACK for an earlier update when it is waiting
+// for an ACK for another update. We do this by dropping ACKs
+// for the original update and sending a random ACK to the slave.
+TEST_F(TaskStatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+      .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
+
+  // Drop the ACKs, so that status update manager
+  // retries the update.
+  DROP_CALLS(mesos::scheduler::Call(),
+             mesos::scheduler::Call::ACKNOWLEDGE,
+             _,
+             master.get()->pid);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+  StatusUpdate update = statusUpdateMessage->update();
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  Future<Nothing> unexpectedAck =
+      FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  // Now send an ACK with a random UUID.
+  process::dispatch(
+      slave.get()->pid,
+      &Slave::statusUpdateAcknowledgement,
+      master.get()->pid,
+      update.slave_id(),
+      frameworkId,
+      update.status().task_id(),
+      UUID::random().toBytes());
+
+  AWAIT_READY(unexpectedAck);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received after the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// forward the duplicate update to the scheduler.
+TEST_F(TaskStatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, flags);
+  ASSERT_SOME(slave);
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status->state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update->state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+  Clock::settle();
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate status updates, when the second
+// update with the same UUID is received before the ACK for the
+// first update. The proper behavior here is for the status update
+// manager to drop the duplicate update.
+TEST_F(TaskStatusUpdateManagerTest, DuplicateUpdateBeforeAck)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the first status update message.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  AWAIT_READY(statusUpdateAckMessage);
+
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
+
+  // Now resend the TASK_RUNNING update.
+  process::post(slave.get()->pid, statusUpdateMessage.get());
+
+  // At this point the status update manager has handled
+  // the duplicate status update.
+  AWAIT_READY(___statusUpdate);
+
+  // After we advance the clock, the status update manager should
+  // retry the TASK_RUNNING update and the scheduler should receive
+  // and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_RUNNING, update->state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that the status update manager correctly includes
+// the latest state of the task in status update.
+TEST_F(TaskStatusUpdateManagerTest, LatestTaskState)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Signal when the first update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
+
+  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
+
+  driver.start();
+
+  // Wait until TASK_RUNNING is sent to the master.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Ensure the status update manager handles the TASK_RUNNING update.
+  AWAIT_READY(___statusUpdate);
+
+  // Pause the clock to avoid status update manager from retrying.
+  Clock::pause();
+
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
+
+  // Now send TASK_FINISHED update.
+  TaskStatus finishedStatus;
+  finishedStatus = statusUpdateMessage->update().status();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure the status update manager handles the TASK_FINISHED update.
+  AWAIT_READY(___statusUpdate2);
+
+  // Signal when the second update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage2 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
+
+  // Advance the clock for the status update manager to send a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(statusUpdateMessage2);
+
+  // The update should correspond to TASK_RUNNING.
+  ASSERT_EQ(TASK_RUNNING, statusUpdateMessage2->update().status().state());
+
+  // The update should include TASK_FINISHED as the latest state.
+  ASSERT_EQ(TASK_FINISHED,
+            statusUpdateMessage2->update().latest_state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that if master receives a status update
+// for an already terminated task it forwards it without
+// changing the state of the task.
+TEST_F(TaskStatusUpdateManagerTest, DuplicatedTerminalStatusUpdate)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status->state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update->state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+
+  // Verify the latest task status.
+  Future<process::http::Response> tasks = process::http::get(
+      master.get()->pid,
+      "tasks",
+      None(),
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, tasks);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", tasks);
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(tasks->body);
+  ASSERT_SOME(parse);
+
+  Result<JSON::String> state = parse->find<JSON::String>("tasks[0].state");
+
+  ASSERT_SOME_EQ(JSON::String("TASK_FINISHED"), state);
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {