You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/08/05 02:25:06 UTC

[2/3] mesos git commit: Fixed the master to recover resources/update state for orphan tasks.

Fixed the master to recover resources/update state for orphan tasks.

The master's status handler function used to ignore the status updates
from the agents for frameworks not yet re-connected with the master
upon a failover. This change modifies that logic to still update
the local state and not bail out early.



Branch: refs/heads/1.0.x
Commit: 6d18068b3d3576ae06d0058dd03b83a8c667db06
Parents: 3defabe
Author: Anand Mazumdar <>
Authored: Thu Aug 4 19:00:47 2016 -0700
Committer: Anand Mazumdar <>
Committed: Thu Aug 4 19:04:20 2016 -0700

 src/master/master.cpp      |  50 +++++-----
 src/tests/master_tests.cpp | 203 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 232 insertions(+), 21 deletions(-)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cf2cb58..8def6b3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5134,16 +5134,6 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
-  Framework* framework = getFramework(update.framework_id());
-  if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring status update " << update
-                 << " from agent " << *slave
-                 << " because the framework is unknown";
-    metrics->invalid_status_updates++;
-    return;
-  }
   LOG(INFO) << "Status update " << update << " from agent " << *slave;
   // We ensure that the uuid of task status matches the update's uuid, in case
@@ -5155,8 +5145,21 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
-  // Forward the update to the framework.
-  forward(update, pid, framework);
+  bool validStatusUpdate = true;
+  Framework* framework = getFramework(update.framework_id());
+  // A framework might not have re-registered upon a master failover or
+  // got disconnected.
+  if (framework != nullptr && framework->connected) {
+    forward(update, pid, framework);
+  } else {
+    validStatusUpdate = false;
+    LOG(WARNING) << "Received status update " << update << " from agent "
+                 << *slave << " for "
+                 << (framework == nullptr ? "an unknown " : "a disconnected ")
+                 << "framework";
+  }
   // Lookup the task and see if we need to update anything locally.
   Task* task = slave->getTask(update.framework_id(), update.status().task_id());
@@ -5175,7 +5178,8 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
-  metrics->valid_status_updates++;
+  validStatusUpdate
+    ? metrics->valid_status_updates++ : metrics->invalid_status_updates++;
@@ -5195,6 +5199,18 @@ void Master::forward(
     LOG(INFO) << "Forwarding status update " << update;
+  // The task might not exist in master's memory (e.g., failed task validation).
+  Task* task = framework->getTask(update.status().task_id());
+  if (task != nullptr) {
+    // Set the status update state and uuid for the task. Note that
+    // master-generated updates are terminal and do not have a uuid
+    // (in which case the master also calls `removeTask()`).
+    if (update.has_uuid()) {
+      task->set_status_update_state(update.status().state());
+      task->set_status_update_uuid(update.status().uuid());
+    }
+  }
   StatusUpdateMessage message;
@@ -6807,14 +6823,6 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
-  // Set the status update state and uuid for the task. Note that
-  // master-generated updates are terminal and do not have a uuid
-  // (in which case the master also calls `removeTask()`).
-  if (update.has_uuid()) {
-    task->set_status_update_state(status.state());
-    task->set_status_update_uuid(update.uuid());
-  }
   // TODO(brenden): Consider wiping the `message` field?
   if (task->statuses_size() > 0 &&
       task->statuses(task->statuses_size() - 1).state() == status.state()) {
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 6709818..b1d7545 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -86,6 +86,9 @@ using mesos::master::contender::MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
 using mesos::master::detector::MasterDetector;
 using mesos::master::detector::StandaloneMasterDetector;
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
 using process::Clock;
 using process::Future;
 using process::Owned;
@@ -4658,6 +4661,206 @@ TEST_F(MasterTest, RejectFrameworkWithInvalidFailoverTimeout)
+// This test verifies that we recover resources when an orphaned task reaches
+// a terminal state.
+TEST_F(MasterTest, RecoverResourcesOrphanedTask)
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
+  ASSERT_SOME(slave);
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillOnce(Return());
+  ContentType contentType = ContentType::PROTOBUF;
+  scheduler::TestV1Mesos mesos(master.get()->pid, contentType, scheduler);
+  AWAIT_READY(connected);
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    mesos.send(call);
+  }
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId = subscribed.get().framework_id();
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  v1::executor::Mesos* execMesos = nullptr;
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+  EXPECT_CALL(*executor, subscribed(_, _))
+    .WillOnce(SaveArg<0>(&execMesos));
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(executor::SendUpdateFromTask(
+        frameworkId, evolve(executorId), v1::TASK_RUNNING));
+  Future<Nothing> acknowledged;
+  EXPECT_CALL(*executor, acknowledged(_, _))
+    .WillOnce(FutureSatisfy(&acknowledged));
+  Future<Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+  const v1::Offer& offer = offers->offers(0);
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+    mesos.send(call);
+  }
+  AWAIT_READY(acknowledged);
+  AWAIT_READY(update);
+  EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+  EXPECT_TRUE(update->status().has_executor_id());
+  EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+  // Failover the master.
+  master->reset();
+  master = StartMaster();
+  ASSERT_SOME(master);
+  AWAIT_READY(disconnected);
+  // Have the agent re-register with the master.
+  detector.appoint(master.get()->pid);
+  // Ensure re-registration is complete.
+  Clock::pause();
+  Clock::settle();
+  EXPECT_CALL(*executor, acknowledged(_, _));
+  Future<v1::executor::Call> updateCall =
+    FUTURE_HTTP_CALL(v1::executor::Call(),
+                     v1::executor::Call::UPDATE,
+                     _,
+                     contentType);
+  // Send a terminal status update while the task is an orphan i.e., the
+  // framework has not reconnected.
+  {
+    v1::TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskInfo.task_id());
+    status.mutable_executor_id()->CopyFrom(evolve(executorId));
+    status.set_state(v1::TASK_FINISHED);
+    status.set_source(v1::TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(UUID::random().toBytes());
+    v1::executor::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(evolve(executorId));
+    call.set_type(v1::executor::Call::UPDATE);
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+    execMesos->send(call);
+  }
+  AWAIT_READY(updateCall);
+  // Ensure that the update is processed by the agent.
+  Clock::settle();
+  Future<Nothing> recoverResources =
+    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+  // Advance the clock for the status update manager to retry with the
+  // latest state of the task.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+  // Ensure that the resources are successfully recovered.
+  AWAIT_READY(recoverResources);
+  // Ensure that the state of the task is updated to `TASK_FINISHED` on the
+  // master.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_TASKS);
+    process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    v1::master::Response::GetTasks tasks = deserialize<v1::master::Response>(
+        contentType, response->body).get().get_tasks();
+    ASSERT_TRUE(tasks.IsInitialized());
+    ASSERT_EQ(1, tasks.orphan_tasks().size());
+    ASSERT_EQ(TASK_FINISHED, tasks.orphan_tasks(0).state());
+  }
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {