You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/21 21:15:32 UTC

[8/8] mesos git commit: Changed agent to send TASK_GONE.

Changed agent to send TASK_GONE.

The agent previously sent TASK_LOST updates for tasks that are killed
for various reasons, such as containerizer errors or QoS preemption. The
agent now sends TASK_GONE to partition-aware frameworks instead.

Review: https://reviews.apache.org/r/52803/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01aa3ba4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01aa3ba4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01aa3ba4

Branch: refs/heads/master
Commit: 01aa3ba4a043e3aff04a632008f65a6ba33f8dcb
Parents: b46df16
Author: Neil Conway <ne...@gmail.com>
Authored: Fri Oct 21 14:13:45 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 21 14:13:45 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                             |  70 +++++++++--
 .../docker_containerizer_tests.cpp              |  16 ++-
 src/tests/oversubscription_tests.cpp            | 124 ++++++++++++++++++-
 src/tests/slave_tests.cpp                       | 123 +++++++++++++++++-
 4 files changed, 312 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e13485c..881c10a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2111,8 +2111,20 @@ void Slave::__run(
 
     Executor* executor = getExecutor(frameworkId, executorId);
     if (executor != nullptr) {
+      Framework* framework = getFramework(frameworkId);
+      CHECK_NOTNULL(framework);
+
+      // Send TASK_GONE because the task was started but has now
+      // been terminated. If the framework is not partition-aware,
+      // we send TASK_LOST instead for backward compatibility.
+      mesos::TaskState taskState = TASK_GONE;
+      if (!protobuf::frameworkHasCapability(
+              framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
       ContainerTermination termination;
-      termination.set_state(TASK_LOST);
+      termination.set_state(taskState);
       termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
       termination.set_message(
           "Failed to update resources for container: " +
@@ -3624,8 +3636,20 @@ void Slave::_reregisterExecutor(
 
     Executor* executor = getExecutor(frameworkId, executorId);
     if (executor != nullptr) {
+      Framework* framework = getFramework(frameworkId);
+      CHECK_NOTNULL(framework);
+
+      // Send TASK_GONE because the task was started but has now
+      // been terminated. If the framework is not partition-aware,
+      // we send TASK_LOST instead for backward compatibility.
+      mesos::TaskState taskState = TASK_GONE;
+      if (!protobuf::frameworkHasCapability(
+              framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
       ContainerTermination termination;
-      termination.set_state(TASK_LOST);
+      termination.set_state(taskState);
       termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
       termination.set_message(
           "Failed to update resources for container: " +
@@ -3667,8 +3691,18 @@ void Slave::reregisterExecutorTimeout()
 
           executor->state = Executor::TERMINATING;
 
+          // Send TASK_GONE because the task was started but has now
+          // been terminated. If the framework is not partition-aware,
+          // we send TASK_LOST instead for backward compatibility.
+          mesos::TaskState taskState = TASK_GONE;
+          if (!protobuf::frameworkHasCapability(
+                  framework->info,
+                  FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
           ContainerTermination termination;
-          termination.set_state(TASK_LOST);
+          termination.set_state(taskState);
           termination.add_reasons(
               TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT);
           termination.set_message(
@@ -3982,8 +4016,20 @@ void Slave::__statusUpdate(
 
     Executor* executor = getExecutor(update.framework_id(), executorId);
     if (executor != nullptr) {
+      Framework* framework = getFramework(update.framework_id());
+      CHECK_NOTNULL(framework);
+
+      // Send TASK_GONE because the task was started but has now
+      // been terminated. If the framework is not partition-aware,
+      // we send TASK_LOST instead for backward compatibility.
+      mesos::TaskState taskState = TASK_GONE;
+      if (!protobuf::frameworkHasCapability(
+              framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
       ContainerTermination termination;
-      termination.set_state(TASK_LOST);
+      termination.set_state(taskState);
       termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
       termination.set_message(
           "Failed to update resources for container: " +
@@ -4644,10 +4690,10 @@ void Slave::executorTerminated(
 
       executor->state = Executor::TERMINATED;
 
-      // Transition all live tasks to TASK_LOST/TASK_FAILED.
+      // Transition all live tasks to TASK_GONE/TASK_FAILED.
       // If the containerizer killed the executor (e.g., due to OOM event)
       // or if this is a command executor, we send TASK_FAILED status updates
-      // instead of TASK_LOST.
+      // instead of TASK_GONE.
       // NOTE: We don't send updates if the framework is terminating
       // because we don't want the status update manager to keep retrying
       // these updates since it won't receive ACKs from the scheduler.  Also,
@@ -5698,8 +5744,18 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
           // (MESOS-2875).
           executor->state = Executor::TERMINATING;
 
+          // Send TASK_GONE because the task was started but has now
+          // been terminated. If the framework is not partition-aware,
+          // we send TASK_LOST instead for backward compatibility.
+          mesos::TaskState taskState = TASK_GONE;
+          if (!protobuf::frameworkHasCapability(
+                  framework->info,
+                  FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
           ContainerTermination termination;
-          termination.set_state(TASK_LOST);
+          termination.set_state(taskState);
           termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED);
           termination.set_message("Container preempted by QoS correction");
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index 6d26797..73ae390 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -3382,9 +3382,13 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
     StartSlave(detector.get(), &dockerContainerizer);
   ASSERT_SOME(slave);
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   Future<FrameworkID> frameworkId;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -3423,9 +3427,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
   task.mutable_command()->CopyFrom(command);
   task.mutable_container()->CopyFrom(containerInfo);
 
-  Future<TaskStatus> statusLost;
+  Future<TaskStatus> statusGone;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&statusLost));
+    .WillOnce(FutureArg<1>(&statusGone));
 
   Future<ContainerID> containerId;
   EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
@@ -3441,10 +3445,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
 
   AWAIT_READY_FOR(containerId, Seconds(60));
 
-  AWAIT_READY(statusLost);
-  EXPECT_EQ(TASK_LOST, statusLost.get().state());
+  AWAIT_READY(statusGone);
+  EXPECT_EQ(TASK_GONE, statusGone.get().state());
   EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED,
-            statusLost.get().reason());
+            statusGone.get().reason());
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index b356fb6..027f549 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -943,9 +943,9 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
 }
 
 
-// This test verifies that a QoS controller can kill a running task
-// and that a TASK_LOST with REASON_EXECUTOR_PREEMPTED is sent to the
-// framework.
+// This test verifies that a QoS controller can kill a running task,
+// and that this results in sending a TASK_LOST status update with
+// REASON_EXECUTOR_PREEMPTED if the framework is not partition-aware.
 TEST_F(OversubscriptionTest, QoSCorrectionKill)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -1033,12 +1033,124 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
 
   // Verify task status is TASK_LOST.
   AWAIT_READY(status2);
-  ASSERT_EQ(TASK_LOST, status2->state());
-  ASSERT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+  EXPECT_EQ(TASK_LOST, status2->state());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source());
 
-  // Verify that slave incremented counter for preempted executors.
+  // Verify that slave incremented metrics appropriately.
   snapshot = Metrics();
   EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]);
+  EXPECT_EQ(1u, snapshot.values["slave/tasks_lost"]);
+  EXPECT_EQ(0u, snapshot.values["slave/tasks_gone"]);
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that a QoS controller can kill a running task,
+// and that this results in sending a TASK_GONE status update with
+// REASON_EXECUTOR_PREEMPTED if the framework is partition-aware.
+TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockQoSController controller;
+
+  Queue<list<mesos::slave::QoSCorrection>> corrections;
+
+  EXPECT_CALL(controller, corrections())
+    .WillRepeatedly(InvokeWithoutArgs(
+        &corrections,
+        &Queue<list<mesos::slave::QoSCorrection>>::get));
+
+  Future<lambda::function<Future<ResourceUsage>()>> usageCallback;
+
+  // Catching callback which is passed to the QoS Controller.
+  EXPECT_CALL(controller, initialize(_))
+    .WillOnce(DoAll(FutureArg<0>(&usageCallback), Return(Nothing())));
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &controller, CreateSlaveFlags());
+  ASSERT_SOME(slave);
+
+  // Verify presence and initial value of counter for preempted
+  // executors.
+  JSON::Object snapshot = Metrics();
+  EXPECT_EQ(1u, snapshot.values.count("slave/executors_preempted"));
+  EXPECT_EQ(0u, snapshot.values["slave/executors_preempted"]);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 10");
+
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+  AWAIT_READY(usageCallback);
+
+  Future<ResourceUsage> usage = usageCallback.get()();
+  AWAIT_READY(usage);
+
+  // Expecting the same statistics as these returned by mocked containerizer.
+  ASSERT_EQ(1, usage.get().executors_size());
+
+  const ResourceUsage::Executor& executor = usage.get().executors(0);
+  // Carry out kill correction.
+  QoSCorrection killCorrection;
+
+  QoSCorrection::Kill* kill = killCorrection.mutable_kill();
+  kill->mutable_framework_id()->CopyFrom(frameworkId.get());
+  kill->mutable_executor_id()->CopyFrom(executor.executor_info().executor_id());
+  kill->mutable_container_id()->CopyFrom(executor.container_id());
+
+  corrections.put({killCorrection});
+
+  // Verify task status is TASK_GONE.
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_GONE, status2->state());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source());
+
+  // Verify that slave incremented metrics appropriately.
+  snapshot = Metrics();
+  EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]);
+  EXPECT_EQ(1u, snapshot.values["slave/tasks_gone"]);
+  EXPECT_EQ(0u, snapshot.values["slave/tasks_lost"]);
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 4395a67..8717ed1 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2316,8 +2316,9 @@ TEST_F(SlaveTest, DISABLED_TerminatingSlaveDoesNotReregister)
 
 // This test verifies the slave will destroy a container if, when
 // receiving a terminal status task update, updating the container's
-// resources fails.
-TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
+// resources fails. A non-partition-aware framework should receive
+// TASK_LOST in this situation.
+TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithLost)
 {
   // Start a master.
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -2333,6 +2334,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
   ASSERT_SOME(slave);
 
+  // Connect a non-partition-aware scheduler.
   MockScheduler sched;
   MesosSchedulerDriver driver(
       &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
@@ -2412,6 +2414,123 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
 
   AWAIT_READY(executorLost);
 
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(0, stats.values["slave/tasks_gone"]);
+  EXPECT_EQ(1, stats.values["slave/tasks_lost"]);
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies the slave will destroy a container if, when
+// receiving a terminal status task update, updating the container's
+// resources fails. A partition-aware framework should receive
+// TASK_GONE in this situation.
+TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithGone)
+{
+  // Start a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  // Start a slave.
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  // Connect a partition-aware scheduler.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  Offer offer = offers.get()[0];
+
+  // Start two tasks.
+  vector<TaskInfo> tasks;
+
+  tasks.push_back(createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:0.1;mem:32").get(),
+      "sleep 1000",
+      exec.id));
+
+  tasks.push_back(createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:0.1;mem:32").get(),
+      "sleep 1000",
+      exec.id));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status1, status2, status3, status4;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3))
+    .WillOnce(FutureArg<1>(&status4));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_RUNNING, status2.get().state());
+
+  // Set up the containerizer so the next update() will fail.
+  EXPECT_CALL(containerizer, update(_, _))
+    .WillOnce(Return(Failure("update() failed")))
+    .WillRepeatedly(Return(Nothing()));
+
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
+
+  // Kill one of the tasks. The failed update should result in the
+  // second task going lost when the container is destroyed.
+  driver.killTask(tasks[0].task_id());
+
+  AWAIT_READY(status3);
+  EXPECT_EQ(TASK_KILLED, status3.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
+
+  AWAIT_READY(status4);
+  EXPECT_EQ(TASK_GONE, status4->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason());
+
+  AWAIT_READY(executorLost);
+
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(1, stats.values["slave/tasks_gone"]);
+  EXPECT_EQ(0, stats.values["slave/tasks_lost"]);
+
   driver.stop();
   driver.join();
 }