You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/19 21:07:25 UTC
[1/2] mesos git commit: Formatting cleanup in the Slave.
Repository: mesos
Updated Branches:
refs/heads/master c2a112f36 -> efeb11837
Formatting cleanup in the Slave.
This patch includes fixes for incorrect formatting as well as
acceptable-but-can-be-improved formatting.
Review: https://reviews.apache.org/r/35635
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ccf6c254
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ccf6c254
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ccf6c254
Branch: refs/heads/master
Commit: ccf6c254a1620e512ec66f1e20644d47c12c6832
Parents: c2a112f
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:27 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:28 2015 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 209 +++++++++++++++++++----------------------------
1 file changed, 82 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ccf6c254/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a5ad29f..6c539b5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -326,16 +326,15 @@ void Slave::initialize()
<< "' for --gc_disk_headroom. Must be between 0.0 and 1.0.";
}
- Try<Nothing> initialize = resourceEstimator->initialize(
- defer(self(), &Self::usage));
+ Try<Nothing> initialize =
+ resourceEstimator->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the resource estimator: "
<< initialize.error();
}
- initialize = qosController->initialize(
- defer(self(), &Self::usage));
+ initialize = qosController->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the QoS Controller: "
@@ -512,8 +511,8 @@ void Slave::initialize()
lambda::_1,
flags.external_log_file.get()));
} else if (flags.log_dir.isSome()) {
- Try<string> log = logging::getLogFile(
- logging::getLogSeverity(flags.logging_level));
+ Try<string> log =
+ logging::getLogFile(logging::getLogSeverity(flags.logging_level));
if (log.isError()) {
LOG(ERROR) << "Slave log file cannot be found: " << log.error();
@@ -763,10 +762,7 @@ void Slave::authenticate()
authenticatee->authenticate(master.get(), self(), credential.get())
.onAny(defer(self(), &Self::_authenticate));
- delay(Seconds(5),
- self(),
- &Self::authenticationTimeout,
- authenticating.get());
+ delay(Seconds(5), self(), &Self::authenticationTimeout, authenticating.get());
}
@@ -843,7 +839,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
CHECK_SOME(master);
- switch(state) {
+ switch (state) {
case DISCONNECTED: {
LOG(INFO) << "Registered with master " << master.get()
<< "; given slave ID " << slaveId;
@@ -852,8 +848,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
// in "fetcher.hpp"".
Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
if (recovered.isError()) {
- LOG(FATAL) << "Could not initialize fetcher cache: "
- << recovered.error();
+ LOG(FATAL) << "Could not initialize fetcher cache: "
+ << recovered.error();
}
state = RUNNING;
@@ -876,11 +872,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
// in case we never receive an initial ping.
Clock::cancel(pingTimer);
- pingTimer = delay(
- MASTER_PING_TIMEOUT(),
- self(),
- &Slave::pingTimeout,
- detection);
+ pingTimer =
+ delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
break;
}
@@ -935,7 +928,7 @@ void Slave::reregistered(
<< "(expected: " << info.id() << "). Committing suicide";
}
- switch(state) {
+ switch (state) {
case DISCONNECTED:
LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
@@ -1266,11 +1259,12 @@ void Slave::runTask(
// executors to this framework and remove it from that list.
// TODO(brenden): Consider using stout/cache.hpp instead of boost
// circular_buffer.
- for (boost::circular_buffer<Owned<Framework>>::iterator i =
- completedFrameworks.begin(); i != completedFrameworks.end(); ++i) {
- if ((*i)->id() == frameworkId) {
- framework->completedExecutors = (*i)->completedExecutors;
- completedFrameworks.erase(i);
+ for (auto it = completedFrameworks.begin(), end = completedFrameworks.end();
+ it != end;
+ ++it) {
+ if ((*it)->id() == frameworkId) {
+ framework->completedExecutors = (*it)->completedExecutors;
+ completedFrameworks.erase(it);
break;
}
}
@@ -1298,8 +1292,7 @@ void Slave::runTask(
}
// Unschedule executor meta directory.
- path = paths::getExecutorPath(
- metaDir, info.id(), frameworkId, executorId);
+ path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
@@ -1308,12 +1301,7 @@ void Slave::runTask(
// Run the task after the unschedules are done.
unschedule.onAny(
- defer(self(),
- &Self::_runTask,
- lambda::_1,
- frameworkInfo,
- pid,
- task));
+ defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
}
@@ -1330,10 +1318,10 @@ void Slave::_runTask(
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- LOG(WARNING) << "Ignoring run task " << task.task_id()
- << " because the framework " << frameworkId
- << " does not exist";
- return;
+ LOG(WARNING) << "Ignoring run task " << task.task_id()
+ << " because the framework " << frameworkId
+ << " does not exist";
+ return;
}
const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task);
@@ -1343,7 +1331,7 @@ void Slave::_runTask(
framework->pending[executorId].contains(task.task_id())) {
framework->pending[executorId].erase(task.task_id());
if (framework->pending[executorId].empty()) {
- framework->pending.erase(executorId);
+ framework->pending.erase(executorId);
}
} else {
LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1504,14 +1492,13 @@ void Slave::_runTask(
}
containerizer->update(executor->containerId, resources)
- .onAny(defer(
- self(),
- &Self::runTasks,
- lambda::_1,
- frameworkId,
- executorId,
- executor->containerId,
- list<TaskInfo>({task})));
+ .onAny(defer(self(),
+ &Self::runTasks,
+ lambda::_1,
+ frameworkId,
+ executorId,
+ executor->containerId,
+ list<TaskInfo>({task})));
break;
}
default:
@@ -1705,10 +1692,10 @@ void Slave::killTask(
framework->pending[executorId].erase(taskId);
if (framework->pending[executorId].empty()) {
- framework->pending.erase(executorId);
- if (framework->pending.empty() && framework->executors.empty()) {
- removeFramework(framework);
- }
+ framework->pending.erase(executorId);
+ if (framework->pending.empty() && framework->executors.empty()) {
+ removeFramework(framework);
+ }
}
return;
}
@@ -1716,7 +1703,7 @@ void Slave::killTask(
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
- LOG(WARNING) << "Cannot kill task " << taskId
+ LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because no corresponding executor is running";
// We send a TASK_LOST update because this task has never
@@ -2188,7 +2175,7 @@ void Slave::_statusUpdateAcknowledgement(
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
LOG(ERROR) << "Status update acknowledgement (UUID: " << uuid
- << ") for task " << taskId
+ << ") for task " << taskId
<< " of unknown executor";
return;
}
@@ -2336,14 +2323,13 @@ void Slave::registerExecutor(
}
containerizer->update(executor->containerId, resources)
- .onAny(defer(
- self(),
- &Self::runTasks,
- lambda::_1,
- frameworkId,
- executorId,
- executor->containerId,
- executor->queuedTasks.values()));
+ .onAny(defer(self(),
+ &Self::runTasks,
+ lambda::_1,
+ frameworkId,
+ executorId,
+ executor->containerId,
+ executor->queuedTasks.values()));
break;
}
default:
@@ -2433,13 +2419,12 @@ void Slave::reregisterExecutor(
// Tell the containerizer to update the resources.
containerizer->update(executor->containerId, executor->resources)
- .onAny(defer(
- self(),
- &Self::_reregisterExecutor,
- lambda::_1,
- frameworkId,
- executorId,
- executor->containerId));
+ .onAny(defer(self(),
+ &Self::_reregisterExecutor,
+ lambda::_1,
+ frameworkId,
+ executorId,
+ executor->containerId));
hashmap<TaskID, TaskInfo> unackedTasks;
foreach (const TaskInfo& task, tasks) {
@@ -2461,9 +2446,9 @@ void Slave::reregisterExecutor(
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STAGING &&
!unackedTasks.contains(task->task_id())) {
- LOG(INFO)
- << "Transitioning STAGED task " << task->task_id() << " to LOST"
- << " because it is unknown to the executor " << executorId;
+ LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+ << " to LOST because it is unknown to the executor "
+ << executorId;
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
@@ -2715,24 +2700,12 @@ void Slave::_statusUpdate(
if (checkpoint) {
// Ask the status update manager to checkpoint and reliably send the update.
- statusUpdateManager->update(
- update,
- info.id(),
- executorId,
- containerId)
- .onAny(defer(self(),
- &Slave::__statusUpdate,
- lambda::_1,
- update,
- pid));
+ statusUpdateManager->update(update, info.id(), executorId, containerId)
+ .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
} else {
// Ask the status update manager to just retry the update.
statusUpdateManager->update(update, info.id())
- .onAny(defer(self(),
- &Slave::__statusUpdate,
- lambda::_1,
- update,
- pid));
+ .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
}
}
@@ -2920,11 +2893,8 @@ void Slave::pingOld(const UPID& from, const string& body)
// when this occurs.
Clock::cancel(pingTimer);
- pingTimer = delay(
- MASTER_PING_TIMEOUT(),
- self(),
- &Slave::pingTimeout,
- detection);
+ pingTimer =
+ delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
send(from, "PONG");
}
@@ -2952,11 +2922,8 @@ void Slave::ping(const UPID& from, bool connected)
// when this occurs.
Clock::cancel(pingTimer);
- pingTimer = delay(
- MASTER_PING_TIMEOUT(),
- self(),
- &Slave::pingTimeout,
- detection);
+ pingTimer =
+ delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
send(from, PongSlaveMessage());
}
@@ -3065,21 +3032,21 @@ ExecutorInfo Slave::getExecutorInfo(
task.command().uris());
if (task.command().has_environment()) {
- executor.mutable_command()->mutable_environment()->MergeFrom(
- task.command().environment());
+ executor.mutable_command()->mutable_environment()->MergeFrom(
+ task.command().environment());
}
if (task.command().has_container()) {
- executor.mutable_command()->mutable_container()->MergeFrom(
- task.command().container());
+ executor.mutable_command()->mutable_container()->MergeFrom(
+ task.command().container());
}
if (task.command().has_user()) {
- executor.mutable_command()->set_user(task.command().user());
+ executor.mutable_command()->set_user(task.command().user());
}
- Result<string> path = os::realpath(
- path::join(flags.launcher_dir, "mesos-executor"));
+ Result<string> path =
+ os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
// Explicitly set 'shell' to true since we want to use the shell
// for running the mesos-executor (and even though this is the
@@ -3091,9 +3058,7 @@ ExecutorInfo Slave::getExecutorInfo(
} else {
executor.mutable_command()->set_value(
"echo '" +
- (path.isError()
- ? path.error()
- : "No such file or directory") +
+ (path.isError() ? path.error() : "No such file or directory") +
"'; exit 1");
}
@@ -3122,8 +3087,7 @@ ExecutorInfo Slave::getExecutorInfo(
// Add in any default ContainerInfo.
if (!executor.has_container() && flags.default_container_info.isSome()) {
- executor.mutable_container()->CopyFrom(
- flags.default_container_info.get());
+ executor.mutable_container()->CopyFrom(flags.default_container_info.get());
}
return executor;
@@ -4141,9 +4105,7 @@ void Slave::qosCorrections()
void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
{
// Make sure correction handler is scheduled again.
- delay(flags.qos_correction_interval_min,
- self(),
- &Self::qosCorrections);
+ delay(flags.qos_correction_interval_min, self(), &Self::qosCorrections);
// Verify slave state.
CHECK(state == RECOVERING || state == DISCONNECTED ||
@@ -4275,8 +4237,7 @@ Future<ResourceUsage> Slave::usage()
}
return await(futures).then(
- [usage](const list<Future<ResourceStatistics>>& futures)
- -> Future<ResourceUsage> {
+ [usage](const list<Future<ResourceStatistics>>& futures) {
// NOTE: We add ResourceUsage::Executor to 'usage' the same
// order as we push future to 'futures'. So the variables
// 'future' and 'executor' below should be in sync.
@@ -4298,7 +4259,7 @@ Future<ResourceUsage> Slave::usage()
}
}
- return *usage;
+ return Future<ResourceUsage>(*usage);
});
}
@@ -4442,8 +4403,8 @@ void Slave::sendExecutorTerminatedStatusUpdate(
taskId,
taskState,
TaskStatus::SOURCE_SLAVE,
- termination.isReady() ? termination.get().message() :
- "Abnormal executor termination",
+ termination.isReady() ? termination.get().message()
+ : "Abnormal executor termination",
reason,
executor->id),
UPID());
@@ -4635,10 +4596,7 @@ Executor* Framework::launchExecutor(
<< " in work directory '" << directory << "'";
slave->files->attach(executor->directory, executor->directory)
- .onAny(defer(slave,
- &Slave::fileAttached,
- lambda::_1,
- executor->directory));
+ .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
// Tell the containerizer to launch the executor.
// NOTE: We modify the ExecutorInfo to include the task's
@@ -4686,11 +4644,11 @@ Executor* Framework::launchExecutor(
}
launch.onAny(defer(slave,
- &Slave::executorLaunched,
- id(),
- executor->id,
- containerId,
- lambda::_1));
+ &Slave::executorLaunched,
+ id(),
+ executor->id,
+ containerId,
+ lambda::_1));
// Make sure the executor registers within the given timeout.
delay(slave->flags.executor_registration_timeout,
@@ -4818,10 +4776,7 @@ void Framework::recoverExecutor(const ExecutorState& state)
// Expose the executor's files.
slave->files->attach(executor->directory, executor->directory)
- .onAny(defer(slave,
- &Slave::fileAttached,
- lambda::_1,
- executor->directory));
+ .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
// Add the executor to the framework.
executors[executor->id] = executor;
[2/2] mesos git commit: Sent StatusUpdates if checkpointed resources
don't exist on the slave.
Posted by be...@apache.org.
Sent StatusUpdates if checkpointed resources don't exist on the slave.
No bug was observed (yet), but realized I forgot about this in the
dynamic reservations patches.
Review: https://reviews.apache.org/r/35433
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/efeb1183
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/efeb1183
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/efeb1183
Branch: refs/heads/master
Commit: efeb1183760e4bd9dd73a2a65af16274673a721f
Parents: ccf6c25
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:48 2015 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 1 +
src/slave/slave.cpp | 110 ++++++++++++++++++++++++++++++++++-------
2 files changed, 93 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8df1211..81008ed 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -893,6 +893,7 @@ message TaskStatus {
REASON_MASTER_DISCONNECTED = 7;
REASON_MEMORY_LIMIT = 8;
REASON_RECONCILIATION = 9;
+ REASON_RESOURCES_UNKNOWN = 18;
REASON_SLAVE_DISCONNECTED = 10;
REASON_SLAVE_REMOVED = 11;
REASON_SLAVE_RESTARTED = 12;
http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6c539b5..19b7508 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1332,6 +1332,16 @@ void Slave::_runTask(
framework->pending[executorId].erase(task.task_id());
if (framework->pending[executorId].empty()) {
framework->pending.erase(executorId);
+ // NOTE: Ideally we would perform the following check here:
+ //
+ // if (framework->executors.empty() &&
+ // framework->pending.empty()) {
+ // removeFramework(framework);
+ // }
+ //
+ // However, we need 'framework' to stay valid for the rest of
+ // this function. As such, we perform the check before each of
+ // the 'return' statements below.
}
} else {
LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1347,9 +1357,12 @@ void Slave::_runTask(
<< " of framework " << frameworkId
<< " because the framework is terminating";
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
+
return;
}
@@ -1373,6 +1386,8 @@ void Slave::_runTask(
// manager to stop retrying for its un-acked updates.
statusUpdate(update, UPID());
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
@@ -1380,28 +1395,75 @@ void Slave::_runTask(
return;
}
- // NOTE: If the task or executor uses persistent volumes, the slave
- // should already know about it. In case the slave doesn't know
- // about them (e.g., CheckpointResourcesMessage was dropped or came
- // out of order), we simply fail the slave to be safe.
- Resources volumes = Resources(task.resources()).persistentVolumes();
+ // NOTE: If the task or executor uses resources that are
+ // checkpointed on the slave (e.g. persistent volumes), we should
+ // already know about it. If the slave doesn't know about them (e.g.
+ // CheckpointResourcesMessage was dropped or came out of order),
+ // we send TASK_LOST status updates here since restarting the task
+ // may succeed in the event that CheckpointResourcesMessage arrives
+ // out of order.
+ Resources checkpointedTaskResources =
+ Resources(task.resources()).filter(needCheckpointing);
- foreach (const Resource& volume, volumes) {
- CHECK(checkpointedResources.contains(volume))
- << "Unknown persistent volume " << volume
- << " for task " << task.task_id()
- << " of framework " << frameworkId;
+ foreach (const Resource& resource, checkpointedTaskResources) {
+ if (!checkpointedResources.contains(resource)) {
+ LOG(WARNING) << "Unknown checkpointed resource " << resource
+ << " for task " << task.task_id()
+ << " of framework " << frameworkId;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ "The checkpointed resources being used by the task are unknown to "
+ "the slave",
+ TaskStatus::REASON_RESOURCES_UNKNOWN);
+
+ statusUpdate(update, UPID());
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
}
if (task.has_executor()) {
- Resources volumes =
- Resources(task.executor().resources()).persistentVolumes();
-
- foreach (const Resource& volume, volumes) {
- CHECK(checkpointedResources.contains(volume))
- << "Unknown persistent volume " << volume
- << " for executor " << task.executor().executor_id()
- << " of framework " << frameworkId;
+ Resources checkpointedExecutorResources =
+ Resources(task.executor().resources()).filter(needCheckpointing);
+
+ foreach (const Resource& resource, checkpointedExecutorResources) {
+ if (!checkpointedResources.contains(resource)) {
+ LOG(WARNING) << "Unknown checkpointed resource " << resource
+ << " for executor " << task.executor().executor_id()
+ << " of framework " << frameworkId;
+
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ TaskStatus::SOURCE_SLAVE,
+ "The checkpointed resources being used by the executor are unknown "
+ "to the slave",
+ TaskStatus::REASON_RESOURCES_UNKNOWN,
+ task.executor().executor_id());
+
+ statusUpdate(update, UPID());
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
}
}
@@ -1415,6 +1477,12 @@ void Slave::_runTask(
<< " of framework " << frameworkId
<< " because the slave is terminating";
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
// We don't send a TASK_LOST here because the slave is
// terminating.
return;
@@ -1507,6 +1575,12 @@ void Slave::_runTask(
<< " is in unexpected state " << executor->state;
break;
}
+
+ // Refer to the comment after 'framework->pending.erase' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
}