You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/03 07:52:54 UTC
[mesos] 07/08: Removed code for tracking pending tasks.
This is an automated email from the ASF dual-hosted git repository.
asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6a29061fb2f357d591f27d7541f92e0f73f0e944
Author: Andrei Sekretenko <as...@mesosphere.com>
AuthorDate: Thu Jan 30 15:42:07 2020 +0100
Removed code for tracking pending tasks.
Now that ACCEPT is authorized synchronously, there are no pending
tasks in-between dispatches on `Master` methods, thus the pending task
tracking code is not needed anymore.
Review: https://reviews.apache.org/r/72099
---
include/mesos/master/master.proto | 2 +-
include/mesos/v1/master/master.proto | 2 +-
src/master/http.cpp | 13 +-
src/master/master.cpp | 258 +++-------------------------------
src/master/master.hpp | 10 --
src/master/readonly_handler.cpp | 91 ------------
src/master/validation.cpp | 22 ---
src/master/validation.hpp | 1 -
src/tests/master_validation_tests.cpp | 33 -----
9 files changed, 22 insertions(+), 410 deletions(-)
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index 8c22802..021dadc 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -542,7 +542,7 @@ message Response {
message GetTasks {
// Tasks that are enqueued on the master waiting (e.g., authorizing)
// to be launched.
- repeated Task pending_tasks = 1;
+ repeated Task pending_tasks = 1 [deprecated=true];
// Tasks that have been forwarded to the agent for launch. This
// includes tasks that are staging or running; it also includes
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 40de358..488fe29 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -542,7 +542,7 @@ message Response {
message GetTasks {
// Tasks that are enqueued on the master waiting (e.g., authorizing)
// to be launched.
- repeated Task pending_tasks = 1;
+ repeated Task pending_tasks = 1 [deprecated=true];
// Tasks that have been forwarded to the agent for launch. This
// includes tasks that are staging or running; it also includes
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 67572a3..f1be402 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1029,8 +1029,7 @@ Future<Response> Master::Http::_destroyVolumes(
error = validation::operation::validate(
operation.destroy(),
slave->checkpointedResources,
- slave->usedResources,
- slave->pendingTasks);
+ slave->usedResources);
if (error.isSome()) {
return BadRequest("Invalid DESTROY operation: " + error->message);
@@ -3782,13 +3781,6 @@ Future<Response> Master::Http::_drainAgent(
// It's possible for the slave to be removed in the interim
// if it is marked unreachable.
if (slave != nullptr) {
- hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds;
- foreachpair (const FrameworkID& frameworkId,
- const auto& tasks,
- slave->pendingTasks) {
- pendingTaskIds[frameworkId] = tasks.keys();
- }
-
hashmap<FrameworkID, hashset<TaskID>> taskIds;
foreachpair (const FrameworkID& frameworkId,
const auto& tasks,
@@ -3798,8 +3790,7 @@ Future<Response> Master::Http::_drainAgent(
LOG(INFO)
<< "Transitioning agent " << slaveId << " to the DRAINING state"
- << "; agent has (pending tasks, tasks, operations) == ("
- << stringify(pendingTaskIds) << ", "
+ << "; agent has (tasks, operations) == ("
<< stringify(taskIds) << ", "
<< stringify(slave->operations.keys()) << ")";
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b09ce8e..7662c56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1178,10 +1178,6 @@ void Master::finalize()
removeInverseOffer(inverseOffer);
}
- // Remove pending tasks from the slave. Don't bother
- // recovering the resources in the allocator.
- slave->pendingTasks.clear();
-
// Terminate the slave observer.
terminate(slave->observer);
wait(slave->observer);
@@ -1198,10 +1194,6 @@ void Master::finalize()
foreachvalue (Framework* framework, frameworks.registered) {
allocator->removeFramework(framework->id());
- // Remove pending tasks from the framework. Don't bother
- // recovering the resources in the allocator.
- framework->pendingTasks.clear();
-
// No tasks/executors/offers should remain since the slaves
// have been removed.
CHECK(framework->tasks.empty());
@@ -4275,46 +4267,6 @@ void Master::accept(
LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
<< " on agent " << *slave << " for framework " << *framework;
- auto getOperationTasks =
- [](const Offer::Operation& operation) -> const RepeatedPtrField<TaskInfo>& {
- if (operation.type() == Offer::Operation::LAUNCH) {
- return operation.launch().task_infos();
- }
-
- if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
- return operation.launch_group().task_group().tasks();
- }
-
- UNREACHABLE();
- };
-
- // Add tasks to be launched to the framework's list of pending tasks
- // before authorizing.
- //
- // NOTE: If two tasks have the same ID, the second one will
- // not be put into 'framework->pendingTasks', therefore
- // will not be launched (and TASK_ERROR will be sent).
- // Unfortunately, we can't tell the difference between a
- // duplicate TaskID and getting killed while pending
- // (removed from the map). So it's possible that we send
- // a TASK_ERROR after a TASK_KILLED (see _accept())!
- for (const Offer::Operation& operation : accept.operations()) {
- if (operation.type() == Offer::Operation::LAUNCH ||
- operation.type() == Offer::Operation::LAUNCH_GROUP) {
- for (const TaskInfo& task : getOperationTasks(operation)) {
- if (!framework->pendingTasks.contains(task.task_id())) {
- framework->pendingTasks[task.task_id()] = task;
- }
-
- // Add to the slave's list of pending tasks.
- if (!slave->pendingTasks.contains(framework->id()) ||
- !slave->pendingTasks[framework->id()].contains(task.task_id())) {
- slave->pendingTasks[framework->id()][task.task_id()] = task;
- }
- }
- }
- }
-
// TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous
// authorization was introduced, used to be a deferred continuation of ACCEPT
// call processing, but now is kept only for limiting variable scopes) and
@@ -4376,15 +4328,6 @@ void Master::_accept(
}();
foreach (const TaskInfo& task, tasks) {
- // Remove the task from being pending.
- framework->pendingTasks.erase(task.task_id());
- if (slave != nullptr) {
- slave->pendingTasks[framework->id()].erase(task.task_id());
- if (slave->pendingTasks[framework->id()].empty()) {
- slave->pendingTasks.erase(framework->id());
- }
- }
-
const TaskStatus::Reason reason =
slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED
: TaskStatus::REASON_SLAVE_DISCONNECTED;
@@ -4664,8 +4607,7 @@ void Master::_accept(
Option<Error> error = validation::operation::validate(
operation.destroy(),
slave->checkpointedResources,
- slave->usedResources,
- slave->pendingTasks);
+ slave->usedResources);
error = error.isSome()
? Error(error->message + "; on agent " + stringify(*slave))
@@ -4844,29 +4786,6 @@ void Master::_accept(
case Offer::Operation::LAUNCH: {
foreach (const TaskInfo& task, operation.launch().task_infos()) {
- // The task will not be in `pendingTasks` if it has been
- // killed in the interim. No need to send TASK_KILLED in
- // this case as it has already been sent. Note however that
- // we cannot currently distinguish between the task being
- // killed and the task having a duplicate TaskID within
- // `pendingTasks`. Therefore we must still validate the task
- // to ensure we send the TASK_ERROR in the case that it has a
- // duplicate TaskID.
- //
- // TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED
- // if a task was killed (removed from `pendingTasks`) *and*
- // the task is invalid or unauthorized here.
- //
- // TODO(asekretenko): Now that ACCEPT is authorized synchronously,
- // master state cannot change while the task is being authorized,
- // and all the code for tracking pending tasks can be removed.
- bool pending = framework->pendingTasks.contains(task.task_id());
- framework->pendingTasks.erase(task.task_id());
- slave->pendingTasks[framework->id()].erase(task.task_id());
- if (slave->pendingTasks[framework->id()].empty()) {
- slave->pendingTasks.erase(framework->id());
- }
-
const Option<Error> authorizationError =
authorized(ActionObject::taskLaunch(task, framework->info));
@@ -4931,7 +4850,7 @@ void Master::_accept(
}
// Add task.
- if (pending) {
+ {
Resources consumed;
bool launchExecutor = true;
@@ -5044,26 +4963,9 @@ void Master::_accept(
case Offer::Operation::LAUNCH_GROUP: {
// We must ensure that the entire group can be launched. This
// means all tasks in the group must be authorized and valid.
- // If any tasks in the group have been killed in the interim
- // we must kill the entire group.
const ExecutorInfo& executor = operation.launch_group().executor();
const TaskGroupInfo& taskGroup = operation.launch_group().task_group();
- // Remove all the tasks from being pending.
- hashset<TaskID> killed;
- foreach (const TaskInfo& task, taskGroup.tasks()) {
- bool pending = framework->pendingTasks.contains(task.task_id());
- framework->pendingTasks.erase(task.task_id());
- slave->pendingTasks[framework->id()].erase(task.task_id());
- if (slave->pendingTasks[framework->id()].empty()) {
- slave->pendingTasks.erase(framework->id());
- }
-
- if (!pending) {
- killed.insert(task.task_id());
- }
- }
-
// Note that we do not fill in the `ExecutorInfo.framework_id`
// since we do not have to support backwards compatibility like
// in the `Launch` operation case.
@@ -5110,10 +5012,6 @@ void Master::_accept(
if (error.isSome()) {
CHECK_SOME(reason);
-
- // NOTE: If some of these invalid or unauthorized tasks were
- // killed already, here we end up sending a TASK_ERROR after
- // having already sent TASK_KILLED.
foreach (const TaskInfo& task, taskGroup.tasks()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id(),
@@ -5136,39 +5034,6 @@ void Master::_accept(
continue;
}
- // If task(s) were killed, send TASK_KILLED for
- // all of the remaining tasks, since a TaskGroup must
- // be delivered in its entirety.
- //
- // TODO(bmahler): Do this killing when processing
- // the `Kill` call, rather than doing it here.
- if (!killed.empty()) {
- foreach (const TaskInfo& task, taskGroup.tasks()) {
- if (!killed.contains(task.task_id())) {
- const StatusUpdate& update = protobuf::createStatusUpdate(
- framework->id(),
- task.slave_id(),
- task.task_id(),
- TASK_KILLED,
- TaskStatus::SOURCE_MASTER,
- None(),
- "A task within the task group was killed before"
- " delivery to the agent",
- TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
- metrics->tasks_killed++;
-
- // TODO(bmahler): Increment the task state source metric,
- // we currently cannot because it requires each source
- // requires a reason.
-
- forward(update, UPID(), framework);
- }
- }
-
- continue;
- }
-
// Now launch the task group!
RunTaskGroupMessage message;
message.mutable_framework()->CopyFrom(framework->info);
@@ -5558,22 +5423,15 @@ void Master::checkAndTransitionDrainingAgent(Slave* slave)
}
// Check if the agent has any tasks running or operations pending.
- if (!slave->pendingTasks.empty() ||
- !slave->tasks.empty() ||
+ if (!slave->tasks.empty() ||
!slave->operations.empty()) {
size_t numTasks = 0u;
foreachvalue (const auto& frameworkTasks, slave->tasks) {
numTasks += frameworkTasks.size();
}
- size_t numPendingTasks = 0u;
- foreachvalue (const auto& frameworkTasks, slave->pendingTasks) {
- numPendingTasks += frameworkTasks.size();
- }
-
VLOG(1)
<< "DRAINING Agent " << slaveId << " has "
- << numPendingTasks << " pending tasks, "
<< numTasks << " tasks, and "
<< slave->operations.size() << " operations";
return;
@@ -5751,36 +5609,6 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
++metrics->messages_kill_task;
- if (framework->pendingTasks.contains(taskId)) {
- // Remove from pending tasks.
- framework->pendingTasks.erase(taskId);
-
- if (slaveId.isSome()) {
- Slave* slave = slaves.registered.get(slaveId.get());
-
- if (slave != nullptr) {
- slave->pendingTasks[framework->id()].erase(taskId);
- if (slave->pendingTasks[framework->id()].empty()) {
- slave->pendingTasks.erase(framework->id());
- }
- }
- }
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- framework->id(),
- slaveId,
- taskId,
- TASK_KILLED,
- TaskStatus::SOURCE_MASTER,
- None(),
- "Killed before delivery to the agent",
- TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
- forward(update, UPID(), framework);
-
- return;
- }
-
Task* task = framework->getTask(taskId);
if (task == nullptr) {
LOG(WARNING) << "Cannot kill task " << taskId
@@ -9027,29 +8855,6 @@ void Master::reconcile(
LOG(INFO) << "Performing implicit task state reconciliation"
" for framework " << *framework;
- foreachvalue (const TaskInfo& task, framework->pendingTasks) {
- StatusUpdate update = protobuf::createStatusUpdate(
- framework->id(),
- task.slave_id(),
- task.task_id(),
- TASK_STAGING,
- TaskStatus::SOURCE_MASTER,
- None(),
- "Reconciliation: Latest task state",
- TaskStatus::REASON_RECONCILIATION);
-
- VLOG(1) << "Sending implicit reconciliation state "
- << update.status().state()
- << " for task " << update.status().task_id()
- << " of framework " << *framework;
-
- // TODO(bmahler): Consider using forward(); might lead to too
- // much logging.
- StatusUpdateMessage message;
- *message.mutable_update() = std::move(update);
- framework->send(message);
- }
-
foreachvalue (Task* task, framework->tasks) {
const TaskState& state = task->has_status_update_state()
? task->status_update_state()
@@ -9095,20 +8900,19 @@ void Master::reconcile(
<< " of framework " << *framework;
// Explicit reconciliation occurs for the following cases:
- // (1) Task is known, but pending: TASK_STAGING.
- // (2) Task is known: send the latest state.
- // (3) Task is unknown, slave is recovered: no-op.
- // (4) Task is unknown, slave is registered: TASK_GONE.
- // (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
- // (6) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR.
- // (7) Task is unknown, slave is unknown: TASK_UNKNOWN.
+ // (1) Task is known: send the latest state.
+ // (2) Task is unknown, slave is recovered: no-op.
+ // (3) Task is unknown, slave is registered: TASK_GONE.
+ // (4) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
+ // (5) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR.
+ // (6) Task is unknown, slave is unknown: TASK_UNKNOWN.
//
- // For case (3), if the slave ID is not provided, we err on the
+ // For case (2), if the slave ID is not provided, we err on the
// side of caution and do not reply if there are *any* recovered
// slaves that haven't reregistered, since the task could reside
// on one of these slaves.
//
- // For cases (4), (5), (6) and (7) TASK_LOST is sent instead if the
+ // For cases (3), (4), (5) and (6) TASK_LOST is sent instead if the
// framework has not opted-in to the PARTITION_AWARE capability.
foreach (const scheduler::Call::Reconcile::Task& t, reconcile.tasks()) {
Option<SlaveID> slaveId = None();
@@ -9119,20 +8923,8 @@ void Master::reconcile(
Option<StatusUpdate> update = None();
Task* task = framework->getTask(t.task_id());
- if (framework->pendingTasks.contains(t.task_id())) {
- // (1) Task is known, but pending: TASK_STAGING.
- const TaskInfo& task_ = framework->pendingTasks[t.task_id()];
- update = protobuf::createStatusUpdate(
- framework->id(),
- task_.slave_id(),
- task_.task_id(),
- TASK_STAGING,
- TaskStatus::SOURCE_MASTER,
- None(),
- "Reconciliation: Latest task state",
- TaskStatus::REASON_RECONCILIATION);
- } else if (task != nullptr) {
- // (2) Task is known: send the latest status update state.
+ if (task != nullptr) {
+ // (1) Task is known: send the latest status update state.
const TaskState& state = task->has_status_update_state()
? task->status_update_state()
: task->state();
@@ -9157,7 +8949,7 @@ void Master::reconcile(
protobuf::getTaskContainerStatus(*task));
} else if ((slaveId.isSome() && slaves.recovered.contains(slaveId.get())) ||
(slaveId.isNone() && !slaves.recovered.empty())) {
- // (3) Task is unknown, slave is recovered: no-op. The framework
+ // (2) Task is unknown, slave is recovered: no-op. The framework
// will have to retry this and will not receive a response until
// the agent either registers, or is marked unreachable after the
// timeout.
@@ -9168,7 +8960,7 @@ void Master::reconcile(
"some agents have")
<< " not yet reregistered with the master";
} else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
- // (4) Task is unknown, slave is registered: TASK_GONE. If the
+ // (3) Task is unknown, slave is registered: TASK_GONE. If the
// framework does not have the PARTITION_AWARE capability, send
// TASK_LOST for backward compatibility.
TaskState taskState = TASK_GONE;
@@ -9186,7 +8978,7 @@ void Master::reconcile(
"Reconciliation: Task is unknown to the agent",
TaskStatus::REASON_RECONCILIATION);
} else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) {
- // (5) Slave is unreachable: TASK_UNREACHABLE. If the framework
+ // (4) Slave is unreachable: TASK_UNREACHABLE. If the framework
// does not have the PARTITION_AWARE capability, send TASK_LOST
// for backward compatibility. In either case, the status update
// also includes the time when the slave was marked unreachable.
@@ -9213,7 +9005,7 @@ void Master::reconcile(
None(),
unreachableTime);
} else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
- // (6) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework
+ // (5) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework
// does not have the PARTITION_AWARE capability, send TASK_LOST
// for backward compatibility.
TaskState taskState = TASK_GONE_BY_OPERATOR;
@@ -9231,7 +9023,7 @@ void Master::reconcile(
"Reconciliation: Task is gone",
TaskStatus::REASON_RECONCILIATION);
} else {
- // (7) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
+ // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
// framework does not have the PARTITION_AWARE capability, send
// TASK_LOST for backward compatibility.
TaskState taskState = TASK_UNKNOWN;
@@ -10504,18 +10296,12 @@ void Master::removeFramework(Framework* framework)
CHECK(framework->inverseOffers.empty());
foreachvalue (Slave* slave, slaves.registered) {
- // Remove the pending tasks from the slave.
- slave->pendingTasks.erase(framework->id());
-
// Tell slaves to shutdown the framework.
ShutdownFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
send(slave->pid, message);
}
- // Remove the pending tasks from the framework.
- framework->pendingTasks.clear();
-
// Remove pointers to the framework's tasks in slaves and mark those
// tasks as completed.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
@@ -11039,9 +10825,6 @@ void Master::_removeSlave(
}
}
- // Remove the pending tasks from the slave.
- slave->pendingTasks.clear();
-
// Mark the slave as being removed.
slaves.registered.remove(slave);
slaves.removed.put(slave->id, Nothing());
@@ -12256,11 +12039,6 @@ double Master::_tasks_staging()
{
double count = 0.0;
- // Add the tasks pending validation / authorization.
- foreachvalue (Framework* framework, frameworks.registered) {
- count += framework->pendingTasks.size();
- }
-
foreachvalue (Slave* slave, slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7281815..34ef2f1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -237,12 +237,6 @@ Slave(Master* const _master,
// the executors.
hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
- // Tasks that have not yet been launched because they are currently
- // being authorized. This is similar to Framework's pendingTasks but we
- // track pendingTasks per agent separately to determine if any offer
- // operation for this agent would change resources requested by these tasks.
- hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks;
-
// Tasks present on this slave.
//
// TODO(bmahler): Make this private to enforce that `addTask()` and
@@ -2570,10 +2564,6 @@ struct Framework
process::Time reregisteredTime;
process::Time unregisteredTime;
- // Tasks that have not yet been launched because they are currently
- // being authorized.
- hashmap<TaskID, TaskInfo> pendingTasks;
-
// TODO(bmahler): Make this private to enforce that `addTask()` and
// `removeTask()` are used, and provide a const view into the tasks.
hashmap<TaskID, Task*> tasks;
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index f9c0006..e4a3134 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -191,47 +191,6 @@ void FullFrameworkWriter::operator()(JSON::ObjectWriter* writer) const
// Model all of the tasks associated with a framework.
writer->field("tasks", [this](JSON::ArrayWriter* writer) {
- foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approvers_->approved<VIEW_TASK>(taskInfo, framework_->info)) {
- continue;
- }
-
- writer->element([this, &taskInfo](JSON::ObjectWriter* writer) {
- writer->field("id", taskInfo.task_id().value());
- writer->field("name", taskInfo.name());
- writer->field("framework_id", framework_->id().value());
-
- writer->field(
- "executor_id",
- taskInfo.executor().executor_id().value());
-
- writer->field("slave_id", taskInfo.slave_id().value());
- writer->field("state", TaskState_Name(TASK_STAGING));
- writer->field("resources", taskInfo.resources());
-
- // Tasks are not allowed to mix resources allocated to
- // different roles, see MESOS-6636.
- writer->field(
- "role",
- taskInfo.resources().begin()->allocation_info().role());
-
- writer->field("statuses", std::initializer_list<TaskStatus>{});
-
- if (taskInfo.has_labels()) {
- writer->field("labels", taskInfo.labels());
- }
-
- if (taskInfo.has_discovery()) {
- writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
- }
-
- if (taskInfo.has_container()) {
- writer->field("container", JSON::Protobuf(taskInfo.container()));
- }
- });
- }
-
foreachvalue (Task* task, framework_->tasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
@@ -511,11 +470,6 @@ public:
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
- foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
- frameworksToSlaves[frameworkId].insert(taskInfo.slave_id());
- slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId);
- }
-
foreachvalue (const Task* task, framework->tasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
@@ -632,11 +586,6 @@ public:
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
- foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
- frameworkTaskSummaries[frameworkId].staging++;
- slaveTaskSummaries[taskInfo.slave_id()].staging++;
- }
-
foreachvalue (const Task* task, framework->tasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
@@ -1983,26 +1932,6 @@ function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks(
int field;
- // Pending tasks.
- field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- [&](JSON::ArrayWriter* writer) {
- foreach (const Framework* framework, frameworks) {
- foreachvalue (const TaskInfo& t, framework->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
- continue;
- }
-
- Task task =
- protobuf::createTask(t, TASK_STAGING, framework->id());
-
- writer->element(asV1Protobuf(task));
- }
- }
- });
-
// Active tasks.
field = v1::master::Response::GetTasks::kTasksFieldNumber;
writer->field(
@@ -2095,26 +2024,6 @@ string Master::ReadOnlyHandler::serializeGetTasks(
google::protobuf::io::CodedOutputStream writer(&stream);
foreach (const Framework* framework, frameworks) {
- // Pending tasks.
- foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
- continue;
- }
-
- // TODO(bmahler): Consider not constructing the temporary task
- // object and instead serialize directly. Since we don't expect
- // a large number of pending tasks, we currently don't bother
- // with the more efficient approach.
- //
- // *getTasks.add_pending_tasks() =
- // protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- WireFormatLite2::WriteMessageWithoutCachedSizes(
- mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
- protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
- &writer);
- }
-
// Active tasks.
foreachvalue (const Task* task, framework->tasks) {
// Skip unauthorized tasks.
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 2f80536..084f281 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2543,7 +2543,6 @@ Option<Error> validate(
const Offer::Operation::Destroy& destroy,
const Resources& checkpointedResources,
const hashmap<FrameworkID, Resources>& usedResources,
- const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks,
const Option<FrameworkInfo>& frameworkInfo)
{
// The operation can either contain allocated resources
@@ -2601,27 +2600,6 @@ Option<Error> validate(
}
}
- // Ensure that the volumes being destroyed are not requested by any pending
- // task. This check is mainly to validate destruction of shared volumes.
- // Note that resource requirements in pending tasks are not validated yet
- // so it is possible that the DESTROY validation fails due to invalid
- // pending tasks.
- typedef hashmap<TaskID, TaskInfo> TaskMap;
- foreachvalue(const TaskMap& tasks, pendingTasks) {
- foreachvalue (const TaskInfo& task, tasks) {
- Resources resources = task.resources();
- if (task.has_executor()) {
- resources += task.executor().resources();
- }
-
- foreach (const Resource& volume, destroy.volumes()) {
- if (unallocated(resources).contains(volume)) {
- return Error("Persistent volume in pending tasks");
- }
- }
- }
- }
-
return None();
}
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index b289713..7fe8f08 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -326,7 +326,6 @@ Option<Error> validate(
const Offer::Operation::Destroy& destroy,
const Resources& checkpointedResources,
const hashmap<FrameworkID, Resources>& usedResources,
- const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks,
const Option<FrameworkInfo>& frameworkInfo = None());
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index e92ff59..8d5e74e 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1627,39 +1627,6 @@ TEST_F(DestroyOperationValidationTest, SharedPersistentVolumeInUse)
}
-// This test verifies that DESTROY for shared persistent volumes is valid
-// when the volumes are not assigned to any pending task.
-TEST_F(DestroyOperationValidationTest, SharedPersistentVolumeInPendingTasks)
-{
- Resource cpus = Resources::parse("cpus", "1", "*").get();
- Resource mem = Resources::parse("mem", "5", "*").get();
- Resource sharedDisk = createDiskResource(
- "50", "role1", "1", "path1", None(), true); // Shared.
-
- SlaveID slaveId;
- slaveId.set_value("S1");
-
- // Add a task using shared volume as pending tasks.
- TaskInfo task = createTask(slaveId, sharedDisk, "sleep 1000");
-
- hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks;
- FrameworkID frameworkId1;
- frameworkId1.set_value("id1");
- pendingTasks[frameworkId1] = {{task.task_id(), task}};
-
- // Destroy `sharedDisk` which is assigned to `task`.
- Offer::Operation::Destroy destroy;
- destroy.add_volumes()->CopyFrom(sharedDisk);
-
- EXPECT_SOME(operation::validate(destroy, sharedDisk, {}, pendingTasks));
-
- // Remove all pending tasks.
- pendingTasks.clear();
-
- EXPECT_NONE(operation::validate(destroy, sharedDisk, {}, pendingTasks));
-}
-
-
TEST_F(DestroyOperationValidationTest, UnknownPersistentVolume)
{
Resource volume = Resources::parse("disk", "128", "role1").get();