You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/19 03:24:58 UTC

[1/2] git commit: Added tests for terminal unacknowledged tasks in the Master.

Repository: mesos
Updated Branches:
  refs/heads/master 3636c0ea7 -> aa806af08


Added tests for terminal unacknowledged tasks in the Master.

Review: https://reviews.apache.org/r/25568


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aa806af0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aa806af0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aa806af0

Branch: refs/heads/master
Commit: aa806af0804f880a2d2c0b13092bcf75360566a3
Parents: b3182fb
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:49:54 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Sep 18 18:24:51 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp              |  4 ++
 src/tests/master_tests.cpp         | 79 +++++++++++++++++++++++++++++++++
 src/tests/reconciliation_tests.cpp | 78 ++++++++++++++++++++++++++++++++
 3 files changed, 161 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aa806af0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 34803f7..e64f05f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3151,6 +3151,10 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 // now sent by the StatusUpdateManagerProcess and master itself when
 // it generates TASK_LOST messages. Only 'pid' can be used to identify
 // the slave.
+// TODO(bmahler): The master will not release resources until the
+// slave receives acknowlegements for all non-terminal updates. This
+// means if a framework is down, the resources will remain allocated
+// even though the tasks are terminal on the slaves!
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
   ++metrics.messages_status_update;

http://git-wip-us.apache.org/repos/asf/mesos/blob/aa806af0/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index ff2b50f..8e4ec1d 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2273,3 +2273,82 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined)
 
   Shutdown();
 }
+
+
+// This test ensures that the master releases resources for tasks
+// when they terminate, even if no acknowledgements occur.
+TEST_F(MasterTest, UnacknowledgedTerminalTask)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<PID<Master> > master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:1;mem:64";
+  Try<PID<Slave> > slave = StartSlave(&containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Launch a framework and get a task into a terminal state.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers1;
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(DoAll(FutureArg<1>(&offers1),
+                    LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*")))
+    .WillOnce(FutureArg<1>(&offers2)); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  // Capture the status update message from the slave to the master.
+  Future<StatusUpdateMessage> update =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  // Drop the status updates forwarded to the framework to ensure
+  // that the task remains terminal and unacknowledged in the master.
+  DROP_PROTOBUFS(StatusUpdateMessage(), master.get(), _);
+
+  driver.start();
+
+  // Wait until the framework is registered.
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers1);
+
+  // Once the update is sent, the master should re-offer the
+  // resources consumed by the task.
+  AWAIT_READY(update);
+
+  // Don't wait around for the allocation interval.
+  Clock::pause();
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::resume();
+
+  AWAIT_READY(offers2);
+
+  EXPECT_FALSE(offers1.get().empty());
+  EXPECT_FALSE(offers2.get().empty());
+
+  // Ensure we get all of the resources back.
+  EXPECT_EQ(offers1.get()[0].resources(), offers2.get()[0].resources());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/aa806af0/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 1c9e73b..400c5c0 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -664,3 +664,81 @@ TEST_F(ReconciliationTest, PendingTask)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+// This test ensures that the master responds with the latest state
+// for tasks that are terminal at the master, but have not been
+// acknowledged by the framework. See MESOS-1389.
+TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  // Launch a framework and get a task into a terminal state.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> update1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update1));
+
+  // Prevent the slave from retrying the status update by
+  // only allowing a single update through to the master.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+  FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());;
+
+  // Drop the status update acknowledgements to ensure that the
+  // task remains terminal and unacknowledged in the master.
+  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+
+  driver.start();
+
+  // Wait until the framework is registered.
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(update1);
+  EXPECT_EQ(TASK_FINISHED, update1.get().state());
+  EXPECT_TRUE(update1.get().has_slave_id());
+
+  // Framework should receive a TASK_FINISHED update, since the
+  // master did not receive the acknowledgement.
+  Future<TaskStatus> update2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update2));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  vector<TaskStatus> statuses;
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(update2);
+  EXPECT_EQ(TASK_FINISHED, update2.get().state());
+  EXPECT_TRUE(update2.get().has_slave_id());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}


[2/2] git commit: Hold on to unacknowledged terminal tasks in the Master.

Posted by bm...@apache.org.
Hold on to unacknowledged terminal tasks in the Master.

Review: https://reviews.apache.org/r/25567


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3182fbd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3182fbd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3182fbd

Branch: refs/heads/master
Commit: b3182fbd138137b7864d7bc26cb45e67deaabac0
Parents: 3636c0e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:49:24 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Sep 18 18:24:51 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 131 +++++++++++++++++++++++++++++----------------
 src/master/master.hpp |  11 +++-
 2 files changed, 94 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 52a7409..34803f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -651,7 +651,7 @@ void Master::finalize()
 
   // Remove the slaves.
   foreachvalue (Slave* slave, slaves.registered) {
-    // Remove tasks.
+    // Remove tasks, don't bother recovering resources.
     foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
       foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
         removeTask(task);
@@ -685,7 +685,8 @@ void Master::finalize()
   // allocator or the roles because it is unnecessary bookkeeping at
   // this point since we are shutting down.
   foreachvalue (Framework* framework, frameworks.registered) {
-    // Remove pending tasks from the framework.
+    // Remove pending tasks from the framework. Don't bother
+    // recovering the resources in the allocator.
     framework->pendingTasks.clear();
 
     // No tasks/executors/offers should remain since the slaves
@@ -2717,6 +2718,12 @@ void Master::statusUpdateAcknowledgement(
     return;
   }
 
+  Task* task = slave->getTask(frameworkId, taskId);
+
+  if (task != NULL && protobuf::isTerminalState(task->state())) {
+    removeTask(task);
+  }
+
   LOG(INFO) << "Forwarding status update acknowledgement "
             << UUID::fromBytes(uuid) << " for task " << taskId
             << " of framework " << frameworkId << " to slave " << *slave;
@@ -3192,8 +3199,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   forward(update, pid, framework);
 
   // Lookup the task and see if we need to update anything locally.
-  const TaskStatus& status = update.status();
-  Task* task = slave->getTask(update.framework_id(), status.task_id());
+  Task* task = slave->getTask(update.framework_id(), update.status().task_id());
   if (task == NULL) {
     LOG(WARNING) << "Could not lookup task for status update " << update
                  << " from slave " << *slave;
@@ -3204,20 +3210,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 
   LOG(INFO) << "Status update " << update << " from slave " << *slave;
 
-  // TODO(brenden) Consider wiping the `data` and `message` fields?
-  if (task->statuses_size() > 0 &&
-      task->statuses(task->statuses_size() - 1).state() == status.state()) {
-    task->mutable_statuses()->RemoveLast();
-  }
-  task->add_statuses()->CopyFrom(status);
-  task->set_state(status.state());
+  updateTask(task, update.status());
 
-  // Handle the task appropriately if it's terminated.
-  if (protobuf::isTerminalState(status.state())) {
+  // If the task is terminal and no acknowledgement is needed,
+  // then remove the task now.
+  if (protobuf::isTerminalState(task->state()) && pid == UPID()) {
     removeTask(task);
   }
 
-  stats.tasks[status.state()]++;
   stats.validStatusUpdates++;
   metrics.valid_status_updates++;
 }
@@ -3763,14 +3763,14 @@ void Master::reconcile(
   // missing from the slave. This could happen if the task was
   // dropped by the slave (e.g., slave exited before getting the
   // task or the task was launched while slave was in recovery).
-  // NOTE: copies are used because statusUpdate() modifies
-  //       slave->tasks.
+  // NOTE: copies are needed because removeTask modified slave->tasks.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
-        LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+        LOG(WARNING) << "Task " << task->task_id()
                      << " of framework " << task->framework_id()
-                     << " unknown to the slave " << *slave;
+                     << " unknown to the slave " << *slave
+                     << " during re-registration";
 
         const StatusUpdate& update = protobuf::createStatusUpdate(
             task->framework_id(),
@@ -3779,7 +3779,13 @@ void Master::reconcile(
             TASK_LOST,
             "Task is unknown to the slave");
 
-        statusUpdate(update, UPID());
+        updateTask(task, update.status());
+        removeTask(task);
+
+        Framework* framework = getFramework(frameworkId);
+        if (framework != NULL) {
+          forward(update, UPID(), framework);
+        }
       }
     }
   }
@@ -4079,8 +4085,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 
   // Remove pointers to framework's tasks in slaves, and send status
   // updates.
-  // NOTE: A copy is used because statusUpdate() modifies
-  //       slave->tasks.
+  // NOTE: A copy is needed because removeTask modifies slave->tasks.
   foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
     // Remove tasks that belong to this framework.
     if (task->framework_id() == framework->id) {
@@ -4096,7 +4101,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
         (task->has_executor_id()
             ? Option<ExecutorID>(task->executor_id()) : None()));
 
-      statusUpdate(update, UPID());
+      updateTask(task, update.status());
+      removeTask(task);
+      forward(update, UPID(), framework);
     }
   }
 
@@ -4184,11 +4191,6 @@ void Master::readdSlave(
   }
 
   foreach (const Task& task, tasks) {
-    // Ignore tasks that have reached terminal state.
-    if (protobuf::isTerminalState(task.state())) {
-      continue;
-    }
-
     Task* t = new Task(task);
 
     // Add the task to the slave.
@@ -4211,7 +4213,10 @@ void Master::readdSlave(
                    << " running on slave " << *slave;
     }
 
-    resources[task.framework_id()] += task.resources();
+    // Terminal tasks do not consume resoures.
+    if (!protobuf::isTerminalState(task.state())) {
+      resources[task.framework_id()] += task.resources();
+    }
   }
 
   // Re-add completed tasks reported by the slave.
@@ -4269,8 +4274,7 @@ void Master::removeSlave(Slave* slave)
           (task->has_executor_id() ?
               Option<ExecutorID>(task->executor_id()) : None()));
 
-      task->add_statuses()->CopyFrom(update.status());
-      task->set_state(update.status().state());
+      updateTask(task, update.status());
       removeTask(task);
 
       updates.push_back(update);
@@ -4369,6 +4373,49 @@ void Master::_removeSlave(
 }
 
 
+void Master::updateTask(Task* task, const TaskStatus& status)
+{
+  // Out-of-order updates should not occur, however in case they
+  // do (e.g. MESOS-1799), prevent them here to ensure that the
+  // resource accounting is not affected.
+  if (protobuf::isTerminalState(task->state()) &&
+      !protobuf::isTerminalState(status.state())) {
+    LOG(ERROR) << "Ignoring out of order status update for task "
+               << task->task_id()
+               << " (" << task->state() << " -> " << status.state() << ")";
+    return;
+  }
+
+  // Once the task becomes terminal, we recover the resources.
+  if (!protobuf::isTerminalState(task->state()) &&
+      protobuf::isTerminalState(status.state())) {
+    allocator->resourcesRecovered(
+        task->framework_id(),
+        task->slave_id(),
+        task->resources(),
+        None());
+
+    switch (status.state()) {
+      case TASK_FINISHED: ++metrics.tasks_finished; break;
+      case TASK_FAILED:   ++metrics.tasks_failed;   break;
+      case TASK_KILLED:   ++metrics.tasks_killed;   break;
+      case TASK_LOST:     ++metrics.tasks_lost;     break;
+      default: break;
+    }
+  }
+
+  // TODO(brenden) Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  stats.tasks[status.state()]++;
+}
+
+
 void Master::removeTask(Task* task)
 {
   CHECK_NOTNULL(task);
@@ -4381,6 +4428,14 @@ void Master::removeTask(Task* task)
                  << " of framework " << task->framework_id()
                  << " on slave " << *slave
                  << " in non-terminal state " << task->state();
+
+    // If the task is not terminal, then the resources have
+    // not yet been released.
+    allocator->resourcesRecovered(
+        task->framework_id(),
+        task->slave_id(),
+        task->resources(),
+        None());
   } else {
     LOG(INFO) << "Removing task " << task->task_id()
               << " with resources " << task->resources()
@@ -4397,22 +4452,6 @@ void Master::removeTask(Task* task)
   // Remove from slave.
   slave->removeTask(task);
 
-  // Tell the allocator about the recovered resources.
-  allocator->resourcesRecovered(
-      task->framework_id(),
-      task->slave_id(),
-      task->resources(),
-      None());
-
-  // Update the task state metric.
-  switch (task->state()) {
-    case TASK_FINISHED: ++metrics.tasks_finished; break;
-    case TASK_FAILED:   ++metrics.tasks_failed;   break;
-    case TASK_KILLED:   ++metrics.tasks_killed;   break;
-    case TASK_LOST:     ++metrics.tasks_lost;     break;
-    default: break;
-  }
-
   delete task;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 80d7535..41da240 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -47,6 +47,7 @@
 #include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
 
+#include "common/protobuf_utils.hpp"
 #include "common/type_utils.hpp"
 
 #include "files/files.hpp"
@@ -368,7 +369,11 @@ protected:
       const Filters& filters,
       const process::Future<std::list<process::Future<Option<Error> > > >& f);
 
-  // Remove a task and recover its resources.
+  // Transitions the task, and recovers resources if the task becomes
+  // terminal.
+  void updateTask(Task* task, const TaskStatus& status);
+
+  // Removes the task.
   void removeTask(Task* task);
 
   // Remove an executor and recover its resources.
@@ -908,7 +913,9 @@ struct Slave
 
     foreachkey (const FrameworkID& frameworkId, tasks) {
       foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
-        used += task->resources();
+        if (!protobuf::isTerminalState(task->state())) {
+          used += task->resources();
+        }
       }
     }