You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/10/23 03:27:39 UTC
mesos git commit: Added output operator for Executor struct in agent.
Repository: mesos
Updated Branches:
refs/heads/master 837a13f3b -> 02c7d93ce
Added output operator for Executor struct in agent.
Review: https://reviews.apache.org/r/39569
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02c7d93c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02c7d93c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02c7d93c
Branch: refs/heads/master
Commit: 02c7d93ceefce19743b0e043ead62fb02a160dbd
Parents: 837a13f
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Oct 22 18:25:55 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Thu Oct 22 18:27:15 2015 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 141 ++++++++++++++++++-----------------------------
src/slave/slave.hpp | 1 +
2 files changed, 56 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/02c7d93c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e9f2d1b..582411c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1618,8 +1618,7 @@ void Slave::_runTask(
// Queue task if the executor has not yet registered.
LOG(INFO) << "Queuing task '" << task.task_id()
- << "' for executor " << executorId
- << " of framework '" << frameworkId;
+ << "' for executor " << *executor;
executor->queuedTasks[task.task_id()] = task;
break;
@@ -1632,8 +1631,7 @@ void Slave::_runTask(
// Queue task until the containerizer is updated with new
// resource limits (MESOS-998).
LOG(INFO) << "Queuing task '" << task.task_id()
- << "' for executor " << executorId
- << " of framework '" << frameworkId;
+ << "' for executor " << *executor;
executor->queuedTasks[task.task_id()] = task;
@@ -1660,9 +1658,8 @@ void Slave::_runTask(
break;
}
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
- << " is in unexpected state " << executor->state;
+ LOG(FATAL) << "Executor '" << *executor << " is in unexpected state "
+ << executor->state;
break;
}
@@ -1747,8 +1744,7 @@ void Slave::runTasks(
// when the original instance of the executor was shutting down.
if (executor->containerId != containerId) {
LOG(WARNING) << "Ignoring sending queued tasks '" << taskIds
- << " to executor '" << executorId
- << "' of framework " << frameworkId
+ << " to executor " << *executor
<< " because the target container " << containerId
<< " has exited";
return;
@@ -1765,8 +1761,7 @@ void Slave::runTasks(
// 'executorTerminated'.
if (executor->state != Executor::RUNNING) {
LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
- << " to executor '" << executorId
- << "' of framework " << frameworkId
+ << " to executor " << *executor
<< " because the executor is in "
<< executor->state << " state";
return;
@@ -1777,8 +1772,7 @@ void Slave::runTasks(
// status update because it should be handled in 'killTask'.
if (!executor->queuedTasks.contains(task.task_id())) {
LOG(WARNING) << "Ignoring sending queued task '" << task.task_id()
- << "' to executor '" << executorId
- << "' of framework " << frameworkId
+ << "' to executor " << *executor
<< " because the task has been killed";
continue;
}
@@ -1789,8 +1783,7 @@ void Slave::runTasks(
executor->addTask(task);
LOG(INFO) << "Sending queued task '" << task.task_id()
- << "' to executor '" << executor->id
- << "' of framework " << framework->id();
+ << "' to executor " << *executor;
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
@@ -1933,8 +1926,7 @@ void Slave::killTask(
<< " Unregistered executor " << executor->id
<< " has launched tasks";
- LOG(WARNING) << "Killing the unregistered executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(WARNING) << "Killing the unregistered executor " << *executor
<< " because it has no tasks";
executor->state = Executor::TERMINATING;
@@ -1946,9 +1938,8 @@ void Slave::killTask(
case Executor::TERMINATING:
case Executor::TERMINATED:
LOG(WARNING) << "Ignoring kill task " << taskId
- << " of framework " << frameworkId
- << " because the executor '" << executor->id
- << "' is terminating/terminated";
+ << " because the executor " << *executor
+ << " is terminating/terminated";
break;
case Executor::RUNNING: {
if (executor->queuedTasks.contains(taskId)) {
@@ -1980,8 +1971,7 @@ void Slave::killTask(
break;
}
default:
- LOG(FATAL) << " Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -2113,8 +2103,7 @@ void Slave::schedulerMessage(
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
+ LOG(WARNING) << "Dropping message for executor " << *executor
<< " because executor does not exist";
metrics.invalid_framework_messages++;
return;
@@ -2128,8 +2117,7 @@ void Slave::schedulerMessage(
// message? It's probably okay to just drop it since frameworks
// can have the executor send a message to the master to say when
// it's ready.
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
+ LOG(WARNING) << "Dropping message for executor " << *executor
<< " because executor is not running";
metrics.invalid_framework_messages++;
break;
@@ -2144,8 +2132,7 @@ void Slave::schedulerMessage(
break;
}
default:
- LOG(FATAL) << " Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -2470,8 +2457,7 @@ void Slave::registerExecutor(
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
case Executor::RUNNING:
- LOG(WARNING) << "Shutting down executor '" << executorId
- << "' of framework " << frameworkId
+ LOG(WARNING) << "Shutting down executor " << *executor
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
@@ -2530,8 +2516,7 @@ void Slave::registerExecutor(
break;
}
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -2587,8 +2572,7 @@ void Slave::reregisterExecutor(
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
case Executor::RUNNING:
- LOG(WARNING) << "Shutting down executor '" << executorId
- << "' of framework " << frameworkId
+ LOG(WARNING) << "Shutting down executor " << *executor
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
@@ -2664,8 +2648,7 @@ void Slave::reregisterExecutor(
break;
}
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -2726,8 +2709,7 @@ void Slave::reregisterExecutorTimeout()
// exited! This is because if the executor properly exited,
// it should have already been identified by the isolator
// (via the reaper) and cleaned up!
- LOG(INFO) << "Killing un-reregistered executor '" << executor->id
- << "' of framework " << framework->id();
+ LOG(INFO) << "Killing un-reregistered executor " << *executor;
containerizer->destroy(executor->containerId);
@@ -2745,8 +2727,7 @@ void Slave::reregisterExecutorTimeout()
break;
}
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -2893,12 +2874,10 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
// 400 Bad Request response, indicating the reason in the body.
if (status.source() == TaskStatus::SOURCE_EXECUTOR &&
status.state() == TASK_STAGING) {
- LOG(ERROR) << "Received TASK_STAGING from executor " << executor->pid
- << " of framework " << update.framework_id()
+ LOG(ERROR) << "Received TASK_STAGING from executor " << *executor
<< " which is not allowed. Shutting down the executor";
_shutdownExecutor(framework, executor);
-
return;
}
@@ -3448,8 +3427,7 @@ void Slave::executorLaunched(
switch (executor->state) {
case Executor::TERMINATING:
- LOG(WARNING) << "Killing executor '" << executorId
- << "' of framework '" << frameworkId
+ LOG(WARNING) << "Killing executor " << *executor
<< "' because the executor is terminating";
containerizer->destroy(containerId);
break;
@@ -3458,8 +3436,7 @@ void Slave::executorLaunched(
break;
case Executor::TERMINATED:
default:
- LOG(FATAL) << " Executor '" << executorId
- << "' of framework '" << frameworkId
+ LOG(FATAL) << "Executor " << *executor
<< "' is in an unexpected state " << executor->state;
break;
}
@@ -3598,8 +3575,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
- LOG(INFO) << "Cleaning up executor '" << executor->id
- << "' of framework " << framework->id();
+ LOG(INFO) << "Cleaning up executor " << *executor;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
@@ -3805,8 +3781,7 @@ void Slave::_shutdownExecutor(Framework* framework, Executor* executor)
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
- LOG(INFO) << "Shutting down executor '" << executor->id
- << "' of framework " << framework->id();
+ LOG(INFO) << "Shutting down executor " << *executor;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
@@ -3859,8 +3834,7 @@ void Slave::shutdownExecutorTimeout(
// Make sure this timeout is valid.
if (executor->containerId != containerId) {
- LOG(INFO) << "A new executor '" << executorId
- << "' of framework " << frameworkId
+ LOG(INFO) << "A new executor " << *executor
<< " with run " << executor->containerId
<< " seems to be active. Ignoring the shutdown timeout"
<< " for the old executor run " << containerId;
@@ -3869,19 +3843,15 @@ void Slave::shutdownExecutorTimeout(
switch (executor->state) {
case Executor::TERMINATED:
- LOG(INFO) << "Executor '" << executorId
- << "' of framework " << frameworkId
- << " has already terminated";
+ LOG(INFO) << "Executor " << *executor << " has already terminated";
break;
case Executor::TERMINATING:
- LOG(INFO) << "Killing executor '" << executor->id
- << "' of framework " << framework->id();
+ LOG(INFO) << "Killing executor " << *executor;
containerizer->destroy(executor->containerId);
break;
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -3921,8 +3891,7 @@ void Slave::registerExecutorTimeout(
}
if (executor->containerId != containerId) {
- LOG(INFO) << "A new executor '" << executorId
- << "' of framework " << frameworkId
+ LOG(INFO) << "A new executor " << *executor
<< " with run " << executor->containerId
<< " seems to be active. Ignoring the registration timeout"
<< " for the old executor run " << containerId;
@@ -3936,8 +3905,7 @@ void Slave::registerExecutorTimeout(
// Ignore the registration timeout.
break;
case Executor::REGISTERING: {
- LOG(INFO) << "Terminating executor " << executor->id
- << " of framework " << framework->id()
+ LOG(INFO) << "Terminating executor " << *executor
<< " because it did not register within "
<< flags.executor_registration_timeout;
@@ -3957,8 +3925,7 @@ void Slave::registerExecutorTimeout(
break;
}
default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -4115,29 +4082,22 @@ Future<Nothing> Slave::_recover()
if (flags.recover == "reconnect") {
if (executor->pid) {
- LOG(INFO) << "Sending reconnect request to executor " << executor->id
- << " of framework " << framework->id()
- << " at " << executor->pid;
+ LOG(INFO) << "Sending reconnect request to executor " << *executor;
ReconnectExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
send(executor->pid, message);
} else {
- LOG(INFO) << "Unable to reconnect to executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(INFO) << "Unable to reconnect to executor " << *executor
<< " because no libprocess PID was found";
}
} else {
if (executor->pid) {
// Cleanup executors.
- LOG(INFO) << "Sending shutdown to executor '" << executor->id
- << "' of framework " << framework->id()
- << " to " << executor->pid;
-
+ LOG(INFO) << "Sending shutdown to executor " << *executor;
_shutdownExecutor(framework, executor);
} else {
- LOG(INFO) << "Killing executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(INFO) << "Killing executor " << *executor
<< " because no libprocess PID was found";
containerizer->destroy(executor->containerId);
@@ -4476,8 +4436,7 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
kill.has_container_id() ? kill.container_id() : executor->containerId;
if (containerId != executor->containerId) {
LOG(WARNING) << "Ignoring QoS correction KILL on container '"
- << containerId << "' for executor '"
- << executorId << "' of framework " << frameworkId
+ << containerId << "' for executor " << *executor
<< ": container cannot be found";
continue;
}
@@ -4486,8 +4445,7 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
case Executor::REGISTERING:
case Executor::RUNNING: {
LOG(INFO) << "Killing container '" << containerId
- << "' for executor '" << executorId
- << "' of framework " << frameworkId
+ << "' for executor " << *executor
<< " as QoS correction";
containerizer->destroy(containerId);
@@ -4513,13 +4471,12 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
}
case Executor::TERMINATING:
case Executor::TERMINATED:
- LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
- << executorId << "' of framework " << frameworkId
- << ": executor is " << executor->state;
+ LOG(WARNING) << "Ignoring QoS correction KILL on executor "
+ << *executor << " because the executor is in "
+ << executor->state << " state";
break;
default:
- LOG(FATAL) << " Executor '" << executor->id
- << "' of framework " << framework->id()
+ LOG(FATAL) << "Executor " << *executor
<< " is in unexpected state " << executor->state;
break;
}
@@ -5402,6 +5359,18 @@ bool Executor::isCommandExecutor() const
}
+std::ostream& operator<<(std::ostream& stream, const Executor& executor)
+{
+ stream << "'" << executor.id << "' of framework " << executor.frameworkId;
+
+ if (executor.pid) {
+ stream << " at " << executor.pid;
+ }
+
+ return stream;
+}
+
+
std::ostream& operator<<(std::ostream& stream, Slave::State state)
{
switch (state) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/02c7d93c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d81a9c4..89338e5 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -695,6 +695,7 @@ private:
};
+std::ostream& operator<<(std::ostream& stream, const Executor& executor);
std::ostream& operator<<(std::ostream& stream, Slave::State state);
std::ostream& operator<<(std::ostream& stream, Framework::State state);
std::ostream& operator<<(std::ostream& stream, Executor::State state);