You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/19 23:33:56 UTC
[06/10] mesos git commit: Changed master to send TASK_DROPPED for
task launch errors.
Changed master to send TASK_DROPPED for task launch errors.
When a task launch fails due to a transient error (e.g., insufficient
available resources at an agent), the master sends a TASK_LOST update to
the framework. For PARTITION_AWARE frameworks, we now send TASK_DROPPED
instead.
Review: https://reviews.apache.org/r/52659/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4eac0b06
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4eac0b06
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4eac0b06
Branch: refs/heads/master
Commit: 4eac0b0663de87c0fdde6f9e8c42566f99a3dfaf
Parents: f8a0c28
Author: Neil Conway <ne...@gmail.com>
Authored: Wed Oct 19 16:32:02 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Oct 19 16:32:02 2016 -0700
----------------------------------------------------------------------
src/master/master.cpp | 38 ++++-
src/tests/master_authorization_tests.cpp | 126 +++++++++++++--
src/tests/master_tests.cpp | 224 ++++++++++++++++++++++++--
3 files changed, 358 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 324391a..2fc41f5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3498,7 +3498,9 @@ void Master::accept(
}
}
- // If invalid, send TASK_LOST for the launch attempts.
+ // If invalid, send TASK_DROPPED for the launch attempts. If the
+ // framework is not partition-aware, send TASK_LOST instead.
+ //
// TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
// consistently handle message dropping. It would be ideal if the
// 'drop' overload can handle both resource recovery and lost task
@@ -3507,6 +3509,12 @@ void Master::accept(
LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
<< "': " << error.get().message;
+ TaskState newTaskState = TASK_DROPPED;
+ if (!protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ newTaskState = TASK_LOST;
+ }
+
foreach (const Offer::Operation& operation, accept.operations()) {
if (operation.type() != Offer::Operation::LAUNCH &&
operation.type() != Offer::Operation::LAUNCH_GROUP) {
@@ -3527,16 +3535,21 @@ void Master::accept(
framework->id(),
task.slave_id(),
task.task_id(),
- TASK_LOST,
+ newTaskState,
TaskStatus::SOURCE_MASTER,
None(),
"Task launched with invalid offers: " + error.get().message,
TaskStatus::REASON_INVALID_OFFERS);
- metrics->tasks_lost++;
+ if (protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ metrics->tasks_dropped++;
+ } else {
+ metrics->tasks_lost++;
+ }
metrics->incrementTasksStates(
- TASK_LOST,
+ newTaskState,
TaskStatus::SOURCE_MASTER,
TaskStatus::REASON_INVALID_OFFERS);
@@ -3702,6 +3715,12 @@ void Master::_accept(
Slave* slave = slaves.registered.get(slaveId);
if (slave == nullptr || !slave->connected) {
+ TaskState newTaskState = TASK_DROPPED;
+ if (!protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ newTaskState = TASK_LOST;
+ }
+
foreach (const Offer::Operation& operation, accept.operations()) {
if (operation.type() != Offer::Operation::LAUNCH &&
operation.type() != Offer::Operation::LAUNCH_GROUP) {
@@ -3734,16 +3753,21 @@ void Master::_accept(
framework->id(),
task.slave_id(),
task.task_id(),
- TASK_LOST,
+ newTaskState,
TaskStatus::SOURCE_MASTER,
None(),
slave == nullptr ? "Agent removed" : "Agent disconnected",
reason);
- metrics->tasks_lost++;
+ if (protobuf::frameworkHasCapability(
+ framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ metrics->tasks_dropped++;
+ } else {
+ metrics->tasks_lost++;
+ }
metrics->incrementTasksStates(
- TASK_LOST,
+ newTaskState,
TaskStatus::SOURCE_MASTER,
reason);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index a53e270..001d4b3 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -567,8 +567,9 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
// This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_LOST.
-TEST_F(MasterAuthorizationTest, SlaveRemoved)
+// '_accept()' is called results in TASK_LOST for a framework that is
+// not partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveRemovedLost)
{
MockAuthorizer authorizer;
Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -646,9 +647,7 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
// Check metrics.
JSON::Object stats = Metrics();
- EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
- EXPECT_EQ(1u, stats.values.count(
- "master/task_lost/source_master/reason_slave_removed"));
+ EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
EXPECT_EQ(
1u, stats.values["master/task_lost/source_master/reason_slave_removed"]);
@@ -669,9 +668,117 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
}
+// This test verifies that a slave removal that comes before
+// '_accept()' is called results in TASK_DROPPED for a framework that
+// is partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveRemovedDropped)
+{
+ MockAuthorizer authorizer;
+ Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> authorize;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorized(_))
+ .WillOnce(DoAll(FutureSatisfy(&authorize),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(authorize);
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ // Stop the slave with explicit shutdown as otherwise with
+ // checkpointing the master will wait for the slave to reconnect.
+ slave.get()->shutdown();
+ slave->reset();
+
+ AWAIT_READY(slaveLost);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> recoverResources =
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // Framework should get a TASK_DROPPED.
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_DROPPED, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_MASTER, status.get().source());
+ EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status.get().reason());
+
+ // No task launch should happen resulting in all resources being
+ // returned to the allocator.
+ AWAIT_READY(recoverResources);
+
+ // Check metrics.
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
+ EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
+ EXPECT_EQ(
+ 1u,
+ stats.values["master/task_dropped/source_master/reason_slave_removed"]);
+
+ // Make sure the task is not known to master anymore.
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(0);
+
+ driver.reconcileTasks({});
+
+ // We settle the clock here to ensure any updates sent by the master
+ // are received. There shouldn't be any updates in this case.
+ Clock::pause();
+ Clock::settle();
+
+ driver.stop();
+ driver.join();
+}
+
+
// This test verifies that a slave disconnection that comes before
-// '_launchTasks()' is called results in TASK_LOST.
-TEST_F(MasterAuthorizationTest, SlaveDisconnected)
+// '_launchTasks()' is called results in TASK_LOST for a framework
+// that is not partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveDisconnectedLost)
{
MockAuthorizer authorizer;
Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -750,11 +857,8 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected)
// Check metrics.
JSON::Object stats = Metrics();
- EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+ EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
- EXPECT_EQ(1u,
- stats.values.count(
- "master/task_lost/source_master/reason_slave_removed"));
EXPECT_EQ(
1u,
stats.values["master/task_lost/source_master/reason_slave_removed"]);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index df492d3..b31502f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1488,8 +1488,10 @@ TEST_F(MasterTest, LaunchCombinedOfferTest)
}
-// Test ensures offers for launchTasks cannot span multiple slaves.
-TEST_F(MasterTest, LaunchAcrossSlavesTest)
+// This test ensures that the offers provided to a single launchTasks
+// call cannot span multiple slaves. A non-partition-aware framework
+// should receive TASK_LOST.
+TEST_F(MasterTest, LaunchAcrossSlavesLost)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -1580,24 +1582,134 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest)
// Check metrics.
JSON::Object stats = Metrics();
- EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+ EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
EXPECT_EQ(
1u,
+ stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test ensures that the offers provided to a single launchTasks
+// call cannot span multiple slaves. A partition-aware framework
+// should receive TASK_DROPPED.
+TEST_F(MasterTest, LaunchAcrossSlavesDropped)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ // See LaunchCombinedOfferTest() for resource size motivation.
+ Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+ Resources twoSlaves = fullSlave + fullSlave;
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = Option<string>(stringify(fullSlave));
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave1 =
+ StartSlave(detector.get(), &containerizer, flags);
+ ASSERT_SOME(slave1);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver.start();
+
+ AWAIT_READY(offers1);
+ EXPECT_NE(0u, offers1.get().size());
+ Resources resources1(offers1.get()[0].resources());
+ EXPECT_EQ(2, resources1.cpus().get());
+ EXPECT_EQ(Megabytes(1024), resources1.mem().get());
+
+ // Test that offers cannot span multiple slaves.
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Create new Flags as we require another work_dir for checkpoints.
+ slave::Flags flags2 = CreateSlaveFlags();
+ flags2.resources = Option<string>(stringify(fullSlave));
+
+ Try<Owned<cluster::Slave>> slave2 =
+ StartSlave(detector.get(), &containerizer, flags2);
+ ASSERT_SOME(slave2);
+
+ AWAIT_READY(offers2);
+ EXPECT_NE(0u, offers2.get().size());
+ Resources resources2(offers1.get()[0].resources());
+ EXPECT_EQ(2, resources2.cpus().get());
+ EXPECT_EQ(Megabytes(1024), resources2.mem().get());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers1.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(twoSlaves);
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ vector<OfferID> combinedOffers;
+ combinedOffers.push_back(offers1.get()[0].id());
+ combinedOffers.push_back(offers2.get()[0].id());
+
+ Future<Nothing> recoverResources =
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+ driver.launchTasks(combinedOffers, {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_DROPPED, status.get().state());
+ EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
+
+ // The resources of the invalid offers should be recovered.
+ AWAIT_READY(recoverResources);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ // Check metrics.
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(1u, stats.values.count("master/tasks_dropped"));
+ EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
+ EXPECT_EQ(
+ 1u,
stats.values.count(
- "master/task_lost/source_master/reason_invalid_offers"));
+ "master/task_dropped/source_master/reason_invalid_offers"));
EXPECT_EQ(
1u,
- stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+ stats.values["master/task_dropped/source_master/reason_invalid_offers"]);
driver.stop();
driver.join();
}
-// Test ensures that an offer cannot appear more than once in offers
-// for launchTasks.
-TEST_F(MasterTest, LaunchDuplicateOfferTest)
+// This test ensures that an offer cannot appear more than once in the
+// offers provided to a single launchTasks call. A non-partition-aware
+// framework should receive TASK_LOST.
+TEST_F(MasterTest, LaunchDuplicateOfferLost)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -1671,15 +1783,103 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest)
// Check metrics.
JSON::Object stats = Metrics();
- EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+ EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
EXPECT_EQ(
1u,
- stats.values.count(
- "master/task_lost/source_master/reason_invalid_offers"));
+ stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test ensures that an offer cannot appear more than once in the
+// offers provided to a single launchTasks call. A partition-aware
+// framework should receive TASK_DROPPED.
+TEST_F(MasterTest, LaunchDuplicateOfferDropped)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ // See LaunchCombinedOfferTest() for resource size motivation.
+ Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = Option<string>(stringify(fullSlave));
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), &containerizer, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ FrameworkInfo::Capability::PARTITION_AWARE);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // Test that same offers cannot be used more than once.
+ // Kill 2nd task and get offer for full slave.
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ Resources resources(offers.get()[0].resources());
+ EXPECT_EQ(2, resources.cpus().get());
+ EXPECT_EQ(Megabytes(1024), resources.mem().get());
+
+ vector<OfferID> combinedOffers;
+ combinedOffers.push_back(offers.get()[0].id());
+ combinedOffers.push_back(offers.get()[0].id());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(fullSlave);
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ Future<TaskStatus> status;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> recoverResources =
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+ driver.launchTasks(combinedOffers, {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_DROPPED, status.get().state());
+ EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
+
+ // The resources of the invalid offers should be recovered.
+ AWAIT_READY(recoverResources);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ // Check metrics.
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
+ EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(
1u,
- stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+ stats.values["master/task_dropped/source_master/reason_invalid_offers"]);
driver.stop();
driver.join();