You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/19 21:07:26 UTC
[2/2] mesos git commit: Sent StatusUpdates if checkpointed resources
don't exist on the slave.
Sent StatusUpdates if checkpointed resources don't exist on the slave.
No bug was observed (yet), but realized I forgot about this in the
dynamic reservations patches.
Review: https://reviews.apache.org/r/35433
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/efeb1183
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/efeb1183
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/efeb1183
Branch: refs/heads/master
Commit: efeb1183760e4bd9dd73a2a65af16274673a721f
Parents: ccf6c25
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:48 2015 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 1 +
src/slave/slave.cpp | 110 ++++++++++++++++++++++++++++++++++-------
2 files changed, 93 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8df1211..81008ed 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -893,6 +893,7 @@ message TaskStatus {
REASON_MASTER_DISCONNECTED = 7;
REASON_MEMORY_LIMIT = 8;
REASON_RECONCILIATION = 9;
+ REASON_RESOURCES_UNKNOWN = 18;
REASON_SLAVE_DISCONNECTED = 10;
REASON_SLAVE_REMOVED = 11;
REASON_SLAVE_RESTARTED = 12;
http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6c539b5..19b7508 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1332,6 +1332,16 @@ void Slave::_runTask(
framework->pending[executorId].erase(task.task_id());
if (framework->pending[executorId].empty()) {
framework->pending.erase(executorId);
+ // NOTE: Ideally we would perform the following check here:
+ //
+ // if (framework->executors.empty() &&
+ // framework->pending.empty()) {
+ // removeFramework(framework);
+ // }
+ //
+ // However, we need 'framework' to stay valid for the rest of
+ // this function. As such, we perform the check before each of
+ // the 'return' statements below.
}
} else {
LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1347,9 +1357,12 @@ void Slave::_runTask(
<< " of framework " << frameworkId
<< " because the framework is terminating";
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
+
return;
}
@@ -1373,6 +1386,8 @@ void Slave::_runTask(
// manager to stop retrying for its un-acked updates.
statusUpdate(update, UPID());
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
@@ -1380,28 +1395,75 @@ void Slave::_runTask(
return;
}
- // NOTE: If the task or executor uses persistent volumes, the slave
- // should already know about it. In case the slave doesn't know
- // about them (e.g., CheckpointResourcesMessage was dropped or came
- // out of order), we simply fail the slave to be safe.
- Resources volumes = Resources(task.resources()).persistentVolumes();
+ // NOTE: If the task or executor uses resources that are
+ // checkpointed on the slave (e.g. persistent volumes), we should
+ // already know about it. If the slave doesn't know about them (e.g.
+ // CheckpointResourcesMessage was dropped or came out of order),
+ // we send TASK_LOST status updates here since restarting the task
+ // may succeed in the event that CheckpointResourcesMessage arrives
+ // out of order.
+ Resources checkpointedTaskResources =
+ Resources(task.resources()).filter(needCheckpointing);
- foreach (const Resource& volume, volumes) {
- CHECK(checkpointedResources.contains(volume))
- << "Unknown persistent volume " << volume
- << " for task " << task.task_id()
- << " of framework " << frameworkId;
+ foreach (const Resource& resource, checkpointedTaskResources) {
+ if (!checkpointedResources.contains(resource)) {
+ LOG(WARNING) << "Unknown checkpointed resource " << resource
+ << " for task " << task.task_id()
+ << " of framework " << frameworkId;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ "The checkpointed resources being used by the task are unknown to "
+ "the slave",
+ TaskStatus::REASON_RESOURCES_UNKNOWN);
+
+ statusUpdate(update, UPID());
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
}
if (task.has_executor()) {
- Resources volumes =
- Resources(task.executor().resources()).persistentVolumes();
-
- foreach (const Resource& volume, volumes) {
- CHECK(checkpointedResources.contains(volume))
- << "Unknown persistent volume " << volume
- << " for executor " << task.executor().executor_id()
- << " of framework " << frameworkId;
+ Resources checkpointedExecutorResources =
+ Resources(task.executor().resources()).filter(needCheckpointing);
+
+ foreach (const Resource& resource, checkpointedExecutorResources) {
+ if (!checkpointedResources.contains(resource)) {
+ LOG(WARNING) << "Unknown checkpointed resource " << resource
+ << " for executor " << task.executor().executor_id()
+ << " of framework " << frameworkId;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ "The checkpointed resources being used by the executor are unknown "
+ "to the slave",
+ TaskStatus::REASON_RESOURCES_UNKNOWN,
+ task.executor().executor_id());
+
+ statusUpdate(update, UPID());
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
}
}
@@ -1415,6 +1477,12 @@ void Slave::_runTask(
<< " of framework " << frameworkId
<< " because the slave is terminating";
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
// We don't send a TASK_LOST here because the slave is
// terminating.
return;
@@ -1507,6 +1575,12 @@ void Slave::_runTask(
<< " is in unexpected state " << executor->state;
break;
}
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
}