You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/05 02:01:38 UTC

[1/2] mesos git commit: Fixed the master to recover resources/update state for orphan tasks.

Repository: mesos
Updated Branches:
  refs/heads/master b406c4a67 -> 1e2187f8b


Fixed the master to recover resources/update state for orphan tasks.

The master's status handler function used to ignore the status updates
from the agents for frameworks not yet re-connected with the master
upon a failover. This change modifies that logic to still update
the local state and not bail out early.

Review: https://reviews.apache.org/r/50723/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1e2187f8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1e2187f8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1e2187f8

Branch: refs/heads/master
Commit: 1e2187f8b3fdb640a744591852bae079c0daa530
Parents: 4097421
Author: Anand Mazumdar <an...@apache.org>
Authored: Thu Aug 4 19:00:47 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Thu Aug 4 19:01:07 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp      |  50 +++++-----
 src/tests/master_tests.cpp | 203 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 232 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1e2187f8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 17d5c58..9259509 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5138,16 +5138,6 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
     return;
   }
 
-  Framework* framework = getFramework(update.framework_id());
-
-  if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring status update " << update
-                 << " from agent " << *slave
-                 << " because the framework is unknown";
-    metrics->invalid_status_updates++;
-    return;
-  }
-
   LOG(INFO) << "Status update " << update << " from agent " << *slave;
 
   // We ensure that the uuid of task status matches the update's uuid, in case
@@ -5159,8 +5149,21 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
     update.mutable_status()->set_uuid(update.uuid());
   }
 
-  // Forward the update to the framework.
-  forward(update, pid, framework);
+  bool validStatusUpdate = true;
+
+  Framework* framework = getFramework(update.framework_id());
+
+  // A framework might not have re-registered upon a master failover or
+  // got disconnected.
+  if (framework != nullptr && framework->connected) {
+    forward(update, pid, framework);
+  } else {
+    validStatusUpdate = false;
+    LOG(WARNING) << "Received status update " << update << " from agent "
+                 << *slave << " for "
+                 << (framework == nullptr ? "an unknown " : "a disconnected ")
+                 << "framework";
+  }
 
   // Lookup the task and see if we need to update anything locally.
   Task* task = slave->getTask(update.framework_id(), update.status().task_id());
@@ -5179,7 +5182,8 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
     removeTask(task);
   }
 
-  metrics->valid_status_updates++;
+  validStatusUpdate
+    ? metrics->valid_status_updates++ : metrics->invalid_status_updates++;
 }
 
 
@@ -5199,6 +5203,18 @@ void Master::forward(
     LOG(INFO) << "Forwarding status update " << update;
   }
 
+  // The task might not exist in master's memory (e.g., failed task validation).
+  Task* task = framework->getTask(update.status().task_id());
+  if (task != nullptr) {
+    // Set the status update state and uuid for the task. Note that
+    // master-generated updates are terminal and do not have a uuid
+    // (in which case the master also calls `removeTask()`).
+    if (update.has_uuid()) {
+      task->set_status_update_state(update.status().state());
+      task->set_status_update_uuid(update.status().uuid());
+    }
+  }
+
   StatusUpdateMessage message;
   message.mutable_update()->MergeFrom(update);
   message.set_pid(acknowledgee);
@@ -6811,14 +6827,6 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
     }
   }
 
-  // Set the status update state and uuid for the task. Note that
-  // master-generated updates are terminal and do not have a uuid
-  // (in which case the master also calls `removeTask()`).
-  if (update.has_uuid()) {
-    task->set_status_update_state(status.state());
-    task->set_status_update_uuid(update.uuid());
-  }
-
   // TODO(brenden): Consider wiping the `message` field?
   if (task->statuses_size() > 0 &&
       task->statuses(task->statuses_size() - 1).state() == status.state()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e2187f8/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 6709818..b1d7545 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -86,6 +86,9 @@ using mesos::master::contender::MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
 using mesos::master::detector::MasterDetector;
 using mesos::master::detector::StandaloneMasterDetector;
 
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
 using process::Clock;
 using process::Future;
 using process::Owned;
@@ -4658,6 +4661,206 @@ TEST_F(MasterTest, RejectFrameworkWithInvalidFailoverTimeout)
   AWAIT_READY(error);
 }
 
+
+// This test verifies that we recover resources when an orphaned task reaches
+// a terminal state.
+TEST_F(MasterTest, RecoverResourcesOrphanedTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillOnce(Return());
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId = subscribed.get().framework_id();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  v1::executor::Mesos* execMesos = nullptr;
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _))
+    .WillOnce(SaveArg<0>(&execMesos));
+
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(executor::SendUpdateFromTask(
+        frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+  Future<Nothing> acknowledged;
+  EXPECT_CALL(*executor, acknowledged(_, _))
+    .WillOnce(FutureSatisfy(&acknowledged));
+
+  Future<Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(acknowledged);
+  AWAIT_READY(update);
+
+  EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+  EXPECT_TRUE(update->status().has_executor_id());
+  EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  // Failover the master.
+  master->reset();
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  AWAIT_READY(disconnected);
+
+  // Have the agent re-register with the master.
+  detector.appoint(master.get()->pid);
+
+  // Ensure re-registration is complete.
+  Clock::pause();
+  Clock::settle();
+
+  EXPECT_CALL(*executor, acknowledged(_, _));
+
+  Future<v1::executor::Call> updateCall =
+    FUTURE_HTTP_CALL(v1::executor::Call(),
+                     v1::executor::Call::UPDATE,
+                     _,
+                     contentType);
+
+  // Send a terminal status update while the task is an orphan i.e., the
+  // framework has not reconnected.
+  {
+    v1::TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskInfo.task_id());
+    status.mutable_executor_id()->CopyFrom(evolve(executorId));
+    status.set_state(v1::TASK_FINISHED);
+    status.set_source(v1::TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(UUID::random().toBytes());
+
+    v1::executor::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+    call.set_type(v1::executor::Call::UPDATE);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    execMesos->send(call);
+  }
+
+  AWAIT_READY(updateCall);
+
+  // Ensure that the update is processed by the agent.
+  Clock::settle();
+
+  Future<Nothing> recoverResources =
+    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+  // Advance the clock for the status update manager to retry with the
+  // latest state of the task.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Ensure that the resources are successfully recovered.
+  AWAIT_READY(recoverResources);
+
+  // Ensure that the state of the task is updated to `TASK_FINISHED` on the
+  // master.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_TASKS);
+
+    process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+    v1::master::Response::GetTasks tasks = deserialize<v1::master::Response>(
+        contentType, response->body).get().get_tasks();
+
+    ASSERT_TRUE(tasks.IsInitialized());
+    ASSERT_EQ(1, tasks.orphan_tasks().size());
+    ASSERT_EQ(TASK_FINISHED, tasks.orphan_tasks(0).state());
+  }
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/2] mesos git commit: Fixed a bug around handling orphaned tasks in `GET_TASKS` call.

Posted by an...@apache.org.
Fixed a bug around handling orphaned tasks in `GET_TASKS` call.

We erroneously wrapped the orphaned tasks loop with the loop
for known frameworks. This means that we would append the list
of orphaned tasks multiple times per framework. If there are
no active frameworks, we won't even return any orphaned tasks.

Review: https://reviews.apache.org/r/50815/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4097421e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4097421e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4097421e

Branch: refs/heads/master
Commit: 4097421e34a6b749f141b34389d7752a43793c4c
Parents: b406c4a
Author: Anand Mazumdar <an...@apache.org>
Authored: Thu Aug 4 19:00:32 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Thu Aug 4 19:01:07 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp | 56 ++++++++++++++++++++++++------------------------
 1 file changed, 28 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4097421e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index e26dc2f..52dd80b 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3792,36 +3792,36 @@ mesos::master::Response::GetTasks Master::Http::_getTasks(
 
       getTasks.add_completed_tasks()->CopyFrom(*task);
     }
+  }
 
-    // Orphan tasks.
-    foreachvalue (const Slave* slave, master->slaves.registered) {
-      typedef hashmap<TaskID, Task*> TaskMap;
-      foreachvalue (const TaskMap& tasks, slave->tasks) {
-        foreachvalue (const Task* task, tasks) {
-          CHECK_NOTNULL(task);
-          const FrameworkID& frameworkId = task->framework_id();
-          if (!master->frameworks.registered.contains(frameworkId)) {
-            // TODO(joerg84): This logic should be simplified after
-            // a deprecation cycle starting with 1.0 as after that
-            // we can rely on `master->frameworks.recovered` containing
-            // all FrameworkInfos.
-            // Until then there are 3 cases:
-            // - No authorization enabled: show all orphaned tasks.
-            // - Authorization enabled, but no FrameworkInfo present:
-            //   do not show orphaned tasks.
-            // - Authorization enabled, FrameworkInfo present: filter
-            //   based on `approveViewTask`.
-            if (master->authorizer.isSome() &&
-               (!master->frameworks.recovered.contains(frameworkId) ||
-                !approveViewTask(
-                    tasksApprover,
-                    *task,
-                    master->frameworks.recovered[frameworkId]))) {
-              continue;
-            }
-
-            getTasks.add_orphan_tasks()->CopyFrom(*task);
+  // Orphan tasks.
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    typedef hashmap<TaskID, Task*> TaskMap;
+    foreachvalue (const TaskMap& tasks, slave->tasks) {
+      foreachvalue (const Task* task, tasks) {
+        CHECK_NOTNULL(task);
+        const FrameworkID& frameworkId = task->framework_id();
+        if (!master->frameworks.registered.contains(frameworkId)) {
+          // TODO(joerg84): This logic should be simplified after
+          // a deprecation cycle starting with 1.0 as after that
+          // we can rely on `master->frameworks.recovered` containing
+          // all FrameworkInfos.
+          // Until then there are 3 cases:
+          // - No authorization enabled: show all orphaned tasks.
+          // - Authorization enabled, but no FrameworkInfo present:
+          //   do not show orphaned tasks.
+          // - Authorization enabled, FrameworkInfo present: filter
+          //   based on `approveViewTask`.
+          if (master->authorizer.isSome() &&
+             (!master->frameworks.recovered.contains(frameworkId) ||
+              !approveViewTask(
+                  tasksApprover,
+                  *task,
+                  master->frameworks.recovered[frameworkId]))) {
+            continue;
           }
+
+          getTasks.add_orphan_tasks()->CopyFrom(*task);
         }
       }
     }