You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/21 21:15:32 UTC
[8/8] mesos git commit: Changed agent to send TASK_GONE.
Changed agent to send TASK_GONE.
The agent previously sent TASK_LOST updates for tasks that are killed
for various reasons, such as containerizer errors or QoS preemption. The
agent now sends TASK_GONE to partition-aware frameworks instead.
Review: https://reviews.apache.org/r/52803/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01aa3ba4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01aa3ba4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01aa3ba4
Branch: refs/heads/master
Commit: 01aa3ba4a043e3aff04a632008f65a6ba33f8dcb
Parents: b46df16
Author: Neil Conway <ne...@gmail.com>
Authored: Fri Oct 21 14:13:45 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 21 14:13:45 2016 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 70 +++++++++--
.../docker_containerizer_tests.cpp | 16 ++-
src/tests/oversubscription_tests.cpp | 124 ++++++++++++++++++-
src/tests/slave_tests.cpp | 123 +++++++++++++++++-
4 files changed, 312 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e13485c..881c10a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2111,8 +2111,20 @@ void Slave::__run(
Executor* executor = getExecutor(frameworkId, executorId);
if (executor != nullptr) {
+ Framework* framework = getFramework(frameworkId);
+ CHECK_NOTNULL(framework);
+
+ // Send TASK_GONE because the task was started but has now
+ // been terminated. If the framework is not partition-aware,
+ // we send TASK_LOST instead for backward compatibility.
+ mesos::TaskState taskState = TASK_GONE;
+ if (!protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ taskState = TASK_LOST;
+ }
+
ContainerTermination termination;
- termination.set_state(TASK_LOST);
+ termination.set_state(taskState);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
@@ -3624,8 +3636,20 @@ void Slave::_reregisterExecutor(
Executor* executor = getExecutor(frameworkId, executorId);
if (executor != nullptr) {
+ Framework* framework = getFramework(frameworkId);
+ CHECK_NOTNULL(framework);
+
+ // Send TASK_GONE because the task was started but has now
+ // been terminated. If the framework is not partition-aware,
+ // we send TASK_LOST instead for backward compatibility.
+ mesos::TaskState taskState = TASK_GONE;
+ if (!protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ taskState = TASK_LOST;
+ }
+
ContainerTermination termination;
- termination.set_state(TASK_LOST);
+ termination.set_state(taskState);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
@@ -3667,8 +3691,18 @@ void Slave::reregisterExecutorTimeout()
executor->state = Executor::TERMINATING;
+ // Send TASK_GONE because the task was started but has now
+ // been terminated. If the framework is not partition-aware,
+ // we send TASK_LOST instead for backward compatibility.
+ mesos::TaskState taskState = TASK_GONE;
+ if (!protobuf::frameworkHasCapability(
+ framework->info,
+ FrameworkInfo::Capability::PARTITION_AWARE)) {
+ taskState = TASK_LOST;
+ }
+
ContainerTermination termination;
- termination.set_state(TASK_LOST);
+ termination.set_state(taskState);
termination.add_reasons(
TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT);
termination.set_message(
@@ -3982,8 +4016,20 @@ void Slave::__statusUpdate(
Executor* executor = getExecutor(update.framework_id(), executorId);
if (executor != nullptr) {
+ Framework* framework = getFramework(update.framework_id());
+ CHECK_NOTNULL(framework);
+
+ // Send TASK_GONE because the task was started but has now
+ // been terminated. If the framework is not partition-aware,
+ // we send TASK_LOST instead for backward compatibility.
+ mesos::TaskState taskState = TASK_GONE;
+ if (!protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ taskState = TASK_LOST;
+ }
+
ContainerTermination termination;
- termination.set_state(TASK_LOST);
+ termination.set_state(taskState);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
@@ -4644,10 +4690,10 @@ void Slave::executorTerminated(
executor->state = Executor::TERMINATED;
- // Transition all live tasks to TASK_LOST/TASK_FAILED.
+ // Transition all live tasks to TASK_GONE/TASK_FAILED.
// If the containerizer killed the executor (e.g., due to OOM event)
// or if this is a command executor, we send TASK_FAILED status updates
- // instead of TASK_LOST.
+ // instead of TASK_GONE.
// NOTE: We don't send updates if the framework is terminating
// because we don't want the status update manager to keep retrying
// these updates since it won't receive ACKs from the scheduler. Also,
@@ -5698,8 +5744,18 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
// (MESOS-2875).
executor->state = Executor::TERMINATING;
+ // Send TASK_GONE because the task was started but has now
+ // been terminated. If the framework is not partition-aware,
+ // we send TASK_LOST instead for backward compatibility.
+ mesos::TaskState taskState = TASK_GONE;
+ if (!protobuf::frameworkHasCapability(
+ framework->info,
+ FrameworkInfo::Capability::PARTITION_AWARE)) {
+ taskState = TASK_LOST;
+ }
+
ContainerTermination termination;
- termination.set_state(TASK_LOST);
+ termination.set_state(taskState);
termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED);
termination.set_message("Container preempted by QoS correction");
http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index 6d26797..73ae390 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -3382,9 +3382,13 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
MockScheduler sched;
MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
@@ -3423,9 +3427,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
- Future<TaskStatus> statusLost;
+ Future<TaskStatus> statusGone;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&statusLost));
+ .WillOnce(FutureArg<1>(&statusGone));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
@@ -3441,10 +3445,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
AWAIT_READY_FOR(containerId, Seconds(60));
- AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, statusLost.get().state());
+ AWAIT_READY(statusGone);
+ EXPECT_EQ(TASK_GONE, statusGone.get().state());
EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED,
- statusLost.get().reason());
+ statusGone.get().reason());
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index b356fb6..027f549 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -943,9 +943,9 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
}
-// This test verifies that a QoS controller can kill a running task
-// and that a TASK_LOST with REASON_EXECUTOR_PREEMPTED is sent to the
-// framework.
+// This test verifies that a QoS controller can kill a running task,
+// and that this results in sending a TASK_LOST status update with
+// REASON_EXECUTOR_PREEMPTED if the framework is not partition-aware.
TEST_F(OversubscriptionTest, QoSCorrectionKill)
{
Try<Owned<cluster::Master>> master = StartMaster();
@@ -1033,12 +1033,124 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
// Verify task status is TASK_LOST.
AWAIT_READY(status2);
- ASSERT_EQ(TASK_LOST, status2->state());
- ASSERT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+ EXPECT_EQ(TASK_LOST, status2->state());
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source());
- // Verify that slave incremented counter for preempted executors.
+ // Verify that slave incremented metrics appropriately.
snapshot = Metrics();
EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]);
+ EXPECT_EQ(1u, snapshot.values["slave/tasks_lost"]);
+ EXPECT_EQ(0u, snapshot.values["slave/tasks_gone"]);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies that a QoS controller can kill a running task,
+// and that this results in sending a TASK_GONE status update with
+// REASON_EXECUTOR_PREEMPTED if the framework is partition-aware.
+TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockQoSController controller;
+
+ Queue<list<mesos::slave::QoSCorrection>> corrections;
+
+ EXPECT_CALL(controller, corrections())
+ .WillRepeatedly(InvokeWithoutArgs(
+ &corrections,
+ &Queue<list<mesos::slave::QoSCorrection>>::get));
+
+ Future<lambda::function<Future<ResourceUsage>()>> usageCallback;
+
+ // Catching callback which is passed to the QoS Controller.
+ EXPECT_CALL(controller, initialize(_))
+ .WillOnce(DoAll(FutureArg<0>(&usageCallback), Return(Nothing())));
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), &controller, CreateSlaveFlags());
+ ASSERT_SOME(slave);
+
+ // Verify presence and initial value of counter for preempted
+ // executors.
+ JSON::Object snapshot = Metrics();
+ EXPECT_EQ(1u, snapshot.values.count("slave/executors_preempted"));
+ EXPECT_EQ(0u, snapshot.values["slave/executors_preempted"]);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 10");
+
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status1);
+ ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+ AWAIT_READY(usageCallback);
+
+ Future<ResourceUsage> usage = usageCallback.get()();
+ AWAIT_READY(usage);
+
+ // Expecting the same statistics as these returned by mocked containerizer.
+ ASSERT_EQ(1, usage.get().executors_size());
+
+ const ResourceUsage::Executor& executor = usage.get().executors(0);
+ // Carry out kill correction.
+ QoSCorrection killCorrection;
+
+ QoSCorrection::Kill* kill = killCorrection.mutable_kill();
+ kill->mutable_framework_id()->CopyFrom(frameworkId.get());
+ kill->mutable_executor_id()->CopyFrom(executor.executor_info().executor_id());
+ kill->mutable_container_id()->CopyFrom(executor.container_id());
+
+ corrections.put({killCorrection});
+
+ // Verify task status is TASK_GONE.
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_GONE, status2->state());
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source());
+
+ // Verify that slave incremented metrics appropriately.
+ snapshot = Metrics();
+ EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]);
+ EXPECT_EQ(1u, snapshot.values["slave/tasks_gone"]);
+ EXPECT_EQ(0u, snapshot.values["slave/tasks_lost"]);
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 4395a67..8717ed1 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2316,8 +2316,9 @@ TEST_F(SlaveTest, DISABLED_TerminatingSlaveDoesNotReregister)
// This test verifies the slave will destroy a container if, when
// receiving a terminal status task update, updating the container's
-// resources fails.
-TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
+// resources fails. A non-partition-aware framework should receive
+// TASK_LOST in this situation.
+TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithLost)
{
// Start a master.
Try<Owned<cluster::Master>> master = StartMaster();
@@ -2333,6 +2334,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
+ // Connect a non-partition-aware scheduler.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
@@ -2412,6 +2414,123 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
AWAIT_READY(executorLost);
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(0, stats.values["slave/tasks_gone"]);
+ EXPECT_EQ(1, stats.values["slave/tasks_lost"]);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies the slave will destroy a container if, when
+// receiving a terminal status task update, updating the container's
+// resources fails. A partition-aware framework should receive
+// TASK_GONE in this situation.
+TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithGone)
+{
+ // Start a master.
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ // Start a slave.
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+ ASSERT_SOME(slave);
+
+ // Connect a partition-aware scheduler.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ // Start two tasks.
+ vector<TaskInfo> tasks;
+
+ tasks.push_back(createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:0.1;mem:32").get(),
+ "sleep 1000",
+ exec.id));
+
+ tasks.push_back(createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:0.1;mem:32").get(),
+ "sleep 1000",
+ exec.id));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status1, status2, status3, status4;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4));
+
+ driver.launchTasks(offer.id(), tasks);
+
+ AWAIT_READY(status1);
+ EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_RUNNING, status2.get().state());
+
+ // Set up the containerizer so the next update() will fail.
+ EXPECT_CALL(containerizer, update(_, _))
+ .WillOnce(Return(Failure("update() failed")))
+ .WillRepeatedly(Return(Nothing()));
+
+ EXPECT_CALL(exec, killTask(_, _))
+ .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+ Future<Nothing> executorLost;
+ EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+ .WillOnce(FutureSatisfy(&executorLost));
+
+ // Kill one of the tasks. The failed update should result in the
+ // second task going lost when the container is destroyed.
+ driver.killTask(tasks[0].task_id());
+
+ AWAIT_READY(status3);
+ EXPECT_EQ(TASK_KILLED, status3.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
+
+ AWAIT_READY(status4);
+ EXPECT_EQ(TASK_GONE, status4->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source());
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason());
+
+ AWAIT_READY(executorLost);
+
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(1, stats.values["slave/tasks_gone"]);
+ EXPECT_EQ(0, stats.values["slave/tasks_lost"]);
+
driver.stop();
driver.join();
}