You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/10/23 03:27:39 UTC

mesos git commit: Added output operator for Executor struct in agent.

Repository: mesos
Updated Branches:
  refs/heads/master 837a13f3b -> 02c7d93ce


Added output operator for Executor struct in agent.

Review: https://reviews.apache.org/r/39569


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02c7d93c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02c7d93c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02c7d93c

Branch: refs/heads/master
Commit: 02c7d93ceefce19743b0e043ead62fb02a160dbd
Parents: 837a13f
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Oct 22 18:25:55 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Thu Oct 22 18:27:15 2015 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 141 ++++++++++++++++++-----------------------------
 src/slave/slave.hpp |   1 +
 2 files changed, 56 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/02c7d93c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e9f2d1b..582411c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1618,8 +1618,7 @@ void Slave::_runTask(
 
       // Queue task if the executor has not yet registered.
       LOG(INFO) << "Queuing task '" << task.task_id()
-                  << "' for executor " << executorId
-                  << " of framework '" << frameworkId;
+                  << "' for executor " << *executor;
 
       executor->queuedTasks[task.task_id()] = task;
       break;
@@ -1632,8 +1631,7 @@ void Slave::_runTask(
       // Queue task until the containerizer is updated with new
       // resource limits (MESOS-998).
       LOG(INFO) << "Queuing task '" << task.task_id()
-                  << "' for executor " << executorId
-                  << " of framework '" << frameworkId;
+                  << "' for executor " << *executor;
 
       executor->queuedTasks[task.task_id()] = task;
 
@@ -1660,9 +1658,8 @@ void Slave::_runTask(
       break;
     }
     default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id()
-                 << " is in unexpected state " << executor->state;
+      LOG(FATAL) << "Executor '" << *executor << " is in unexpected state "
+                 << executor->state;
       break;
   }
 
@@ -1747,8 +1744,7 @@ void Slave::runTasks(
   // when the original instance of the executor was shutting down.
   if (executor->containerId != containerId) {
     LOG(WARNING) << "Ignoring sending queued tasks '" << taskIds
-                 << " to executor '" << executorId
-                 << "' of framework " << frameworkId
+                 << " to executor " << *executor
                  << " because the target container " << containerId
                  << " has exited";
     return;
@@ -1765,8 +1761,7 @@ void Slave::runTasks(
   // 'executorTerminated'.
   if (executor->state != Executor::RUNNING) {
     LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
-                 << " to executor '" << executorId
-                 << "' of framework " << frameworkId
+                 << " to executor " << *executor
                  << " because the executor is in "
                  << executor->state << " state";
     return;
@@ -1777,8 +1772,7 @@ void Slave::runTasks(
     // status update because it should be handled in 'killTask'.
     if (!executor->queuedTasks.contains(task.task_id())) {
       LOG(WARNING) << "Ignoring sending queued task '" << task.task_id()
-                   << "' to executor '" << executorId
-                   << "' of framework " << frameworkId
+                   << "' to executor " << *executor
                    << " because the task has been killed";
       continue;
     }
@@ -1789,8 +1783,7 @@ void Slave::runTasks(
     executor->addTask(task);
 
     LOG(INFO) << "Sending queued task '" << task.task_id()
-              << "' to executor '" << executor->id
-              << "' of framework " << framework->id();
+              << "' to executor " << *executor;
 
     RunTaskMessage message;
     message.mutable_framework()->MergeFrom(framework->info);
@@ -1933,8 +1926,7 @@ void Slave::killTask(
             << " Unregistered executor " << executor->id
             << " has launched tasks";
 
-        LOG(WARNING) << "Killing the unregistered executor '" << executor->id
-                     << "' of framework " << framework->id()
+        LOG(WARNING) << "Killing the unregistered executor " << *executor
                      << " because it has no tasks";
 
         executor->state = Executor::TERMINATING;
@@ -1946,9 +1938,8 @@ void Slave::killTask(
     case Executor::TERMINATING:
     case Executor::TERMINATED:
       LOG(WARNING) << "Ignoring kill task " << taskId
-                   << " of framework " << frameworkId
-                   << " because the executor '" << executor->id
-                   << "' is terminating/terminated";
+                   << " because the executor " << *executor
+                   << " is terminating/terminated";
       break;
     case Executor::RUNNING: {
       if (executor->queuedTasks.contains(taskId)) {
@@ -1980,8 +1971,7 @@ void Slave::killTask(
       break;
     }
     default:
-      LOG(FATAL) << " Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -2113,8 +2103,7 @@ void Slave::schedulerMessage(
 
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
-    LOG(WARNING) << "Dropping message for executor '"
-                 << executorId << "' of framework " << frameworkId
+    LOG(WARNING) << "Dropping message for executor " << *executor
                  << " because executor does not exist";
     metrics.invalid_framework_messages++;
     return;
@@ -2128,8 +2117,7 @@ void Slave::schedulerMessage(
       // message? It's probably okay to just drop it since frameworks
       // can have the executor send a message to the master to say when
       // it's ready.
-      LOG(WARNING) << "Dropping message for executor '"
-                   << executorId << "' of framework " << frameworkId
+      LOG(WARNING) << "Dropping message for executor " << *executor
                    << " because executor is not running";
       metrics.invalid_framework_messages++;
       break;
@@ -2144,8 +2132,7 @@ void Slave::schedulerMessage(
       break;
     }
     default:
-      LOG(FATAL) << " Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -2470,8 +2457,7 @@ void Slave::registerExecutor(
       // TERMINATED is possible if the executor forks, the parent process
       // terminates and the child process (driver) tries to register!
     case Executor::RUNNING:
-      LOG(WARNING) << "Shutting down executor '" << executorId
-                   << "' of framework " << frameworkId
+      LOG(WARNING) << "Shutting down executor " << *executor
                    << " because it is in unexpected state " << executor->state;
       reply(ShutdownExecutorMessage());
       break;
@@ -2530,8 +2516,7 @@ void Slave::registerExecutor(
       break;
     }
     default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -2587,8 +2572,7 @@ void Slave::reregisterExecutor(
       // TERMINATED is possible if the executor forks, the parent process
       // terminates and the child process (driver) tries to register!
     case Executor::RUNNING:
-      LOG(WARNING) << "Shutting down executor '" << executorId
-                   << "' of framework " << frameworkId
+      LOG(WARNING) << "Shutting down executor " << *executor
                    << " because it is in unexpected state " << executor->state;
       reply(ShutdownExecutorMessage());
       break;
@@ -2664,8 +2648,7 @@ void Slave::reregisterExecutor(
       break;
     }
     default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -2726,8 +2709,7 @@ void Slave::reregisterExecutorTimeout()
           // exited! This is because if the executor properly exited,
           // it should have already been identified by the isolator
           // (via the reaper) and cleaned up!
-          LOG(INFO) << "Killing un-reregistered executor '" << executor->id
-                    << "' of framework " << framework->id();
+          LOG(INFO) << "Killing un-reregistered executor " << *executor;
 
           containerizer->destroy(executor->containerId);
 
@@ -2745,8 +2727,7 @@ void Slave::reregisterExecutorTimeout()
           break;
         }
         default:
-          LOG(FATAL) << "Executor '" << executor->id
-                     << "' of framework " << framework->id()
+          LOG(FATAL) << "Executor " << *executor
                      << " is in unexpected state " << executor->state;
           break;
       }
@@ -2893,12 +2874,10 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
   // 400 Bad Request response, indicating the reason in the body.
   if (status.source() == TaskStatus::SOURCE_EXECUTOR &&
       status.state() == TASK_STAGING) {
-    LOG(ERROR) << "Received TASK_STAGING from executor " << executor->pid
-               << " of framework " << update.framework_id()
+    LOG(ERROR) << "Received TASK_STAGING from executor " << *executor
                << " which is not allowed. Shutting down the executor";
 
     _shutdownExecutor(framework, executor);
-
     return;
   }
 
@@ -3448,8 +3427,7 @@ void Slave::executorLaunched(
 
   switch (executor->state) {
     case Executor::TERMINATING:
-      LOG(WARNING) << "Killing executor '" << executorId
-                   << "' of framework '" << frameworkId
+      LOG(WARNING) << "Killing executor " << *executor
                    << "' because the executor is terminating";
       containerizer->destroy(containerId);
       break;
@@ -3458,8 +3436,7 @@ void Slave::executorLaunched(
       break;
     case Executor::TERMINATED:
     default:
-      LOG(FATAL) << " Executor '" << executorId
-                 << "' of framework '" << frameworkId
+      LOG(FATAL) << "Executor " << *executor
                  << "' is in an unexpected state " << executor->state;
       break;
   }
@@ -3598,8 +3575,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(executor);
 
-  LOG(INFO) << "Cleaning up executor '" << executor->id
-            << "' of framework " << framework->id();
+  LOG(INFO) << "Cleaning up executor " << *executor;
 
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING)
@@ -3805,8 +3781,7 @@ void Slave::_shutdownExecutor(Framework* framework, Executor* executor)
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(executor);
 
-  LOG(INFO) << "Shutting down executor '" << executor->id
-            << "' of framework " << framework->id();
+  LOG(INFO) << "Shutting down executor " << *executor;
 
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING)
@@ -3859,8 +3834,7 @@ void Slave::shutdownExecutorTimeout(
 
   // Make sure this timeout is valid.
   if (executor->containerId != containerId) {
-    LOG(INFO) << "A new executor '" << executorId
-              << "' of framework " << frameworkId
+    LOG(INFO) << "A new executor " << *executor
               << " with run " << executor->containerId
               << " seems to be active. Ignoring the shutdown timeout"
               << " for the old executor run " << containerId;
@@ -3869,19 +3843,15 @@ void Slave::shutdownExecutorTimeout(
 
   switch (executor->state) {
     case Executor::TERMINATED:
-      LOG(INFO) << "Executor '" << executorId
-                << "' of framework " << frameworkId
-                << " has already terminated";
+      LOG(INFO) << "Executor " << *executor << " has already terminated";
       break;
     case Executor::TERMINATING:
-      LOG(INFO) << "Killing executor '" << executor->id
-                << "' of framework " << framework->id();
+      LOG(INFO) << "Killing executor " << *executor;
 
       containerizer->destroy(executor->containerId);
       break;
     default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -3921,8 +3891,7 @@ void Slave::registerExecutorTimeout(
   }
 
   if (executor->containerId != containerId) {
-    LOG(INFO) << "A new executor '" << executorId
-              << "' of framework " << frameworkId
+    LOG(INFO) << "A new executor " << *executor
               << " with run " << executor->containerId
               << " seems to be active. Ignoring the registration timeout"
               << " for the old executor run " << containerId;
@@ -3936,8 +3905,7 @@ void Slave::registerExecutorTimeout(
       // Ignore the registration timeout.
       break;
     case Executor::REGISTERING: {
-      LOG(INFO) << "Terminating executor " << executor->id
-                << " of framework " << framework->id()
+      LOG(INFO) << "Terminating executor " << *executor
                 << " because it did not register within "
                 << flags.executor_registration_timeout;
 
@@ -3957,8 +3925,7 @@ void Slave::registerExecutorTimeout(
       break;
     }
     default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id()
+      LOG(FATAL) << "Executor " << *executor
                  << " is in unexpected state " << executor->state;
       break;
   }
@@ -4115,29 +4082,22 @@ Future<Nothing> Slave::_recover()
 
       if (flags.recover == "reconnect") {
         if (executor->pid) {
-          LOG(INFO) << "Sending reconnect request to executor " << executor->id
-                    << " of framework " << framework->id()
-                    << " at " << executor->pid;
+          LOG(INFO) << "Sending reconnect request to executor " << *executor;
 
           ReconnectExecutorMessage message;
           message.mutable_slave_id()->MergeFrom(info.id());
           send(executor->pid, message);
         } else {
-          LOG(INFO) << "Unable to reconnect to executor '" << executor->id
-                    << "' of framework " << framework->id()
+          LOG(INFO) << "Unable to reconnect to executor " << *executor
                     << " because no libprocess PID was found";
         }
       } else {
         if (executor->pid) {
           // Cleanup executors.
-          LOG(INFO) << "Sending shutdown to executor '" << executor->id
-                    << "' of framework " << framework->id()
-                    << " to " << executor->pid;
-
+          LOG(INFO) << "Sending shutdown to executor " << *executor;
           _shutdownExecutor(framework, executor);
         } else {
-          LOG(INFO) << "Killing executor '" << executor->id
-                    << "' of framework " << framework->id()
+          LOG(INFO) << "Killing executor " << *executor
                     << " because no libprocess PID was found";
 
           containerizer->destroy(executor->containerId);
@@ -4476,8 +4436,7 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
           kill.has_container_id() ? kill.container_id() : executor->containerId;
       if (containerId != executor->containerId) {
         LOG(WARNING) << "Ignoring QoS correction KILL on container '"
-                     << containerId << "' for executor '"
-                     << executorId << "' of framework " << frameworkId
+                     << containerId << "' for executor " << *executor
                      << ": container cannot be found";
         continue;
       }
@@ -4486,8 +4445,7 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
         case Executor::REGISTERING:
         case Executor::RUNNING: {
           LOG(INFO) << "Killing container '" << containerId
-                    << "' for executor '" << executorId
-                    << "' of framework " << frameworkId
+                    << "' for executor " << *executor
                     << " as QoS correction";
 
           containerizer->destroy(containerId);
@@ -4513,13 +4471,12 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
         }
         case Executor::TERMINATING:
         case Executor::TERMINATED:
-          LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
-                       << executorId << "' of framework " << frameworkId
-                       << ": executor is " << executor->state;
+          LOG(WARNING) << "Ignoring QoS correction KILL on executor "
+                       << *executor << " because the executor is in "
+                       << executor->state << " state";
           break;
         default:
-          LOG(FATAL) << " Executor '" << executor->id
-                     << "' of framework " << framework->id()
+          LOG(FATAL) << "Executor " << *executor
                      << " is in unexpected state " << executor->state;
           break;
       }
@@ -5402,6 +5359,18 @@ bool Executor::isCommandExecutor() const
 }
 
 
+std::ostream& operator<<(std::ostream& stream, const Executor& executor)
+{
+  stream << "'" << executor.id << "' of framework " << executor.frameworkId;
+
+  if (executor.pid) {
+    stream << " at " << executor.pid;
+  }
+
+  return stream;
+}
+
+
 std::ostream& operator<<(std::ostream& stream, Slave::State state)
 {
   switch (state) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/02c7d93c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d81a9c4..89338e5 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -695,6 +695,7 @@ private:
 };
 
 
+std::ostream& operator<<(std::ostream& stream, const Executor& executor);
 std::ostream& operator<<(std::ostream& stream, Slave::State state);
 std::ostream& operator<<(std::ostream& stream, Framework::State state);
 std::ostream& operator<<(std::ostream& stream, Executor::State state);