You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/06/17 02:43:51 UTC

[2/3] mesos git commit: Added kill executor correction to slave.

Added kill executor correction to slave.

Review: https://reviews.apache.org/r/34720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbbf840
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbbf840
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbbf840

Branch: refs/heads/master
Commit: 8cbbf84068e02cfb5899de8255ac6227713cf7e0
Parents: 1a7d815
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:35 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:37 2015 -0700

----------------------------------------------------------------------
 src/slave/flags.cpp                  |   8 ++
 src/slave/flags.hpp                  |   1 +
 src/slave/slave.cpp                  | 138 +++++++++++++++++++++++++++---
 src/slave/slave.hpp                  |  10 ++-
 src/tests/mesos.cpp                  |   8 ++
 src/tests/mesos.hpp                  |   6 +-
 src/tests/oversubscription_tests.cpp |   2 +-
 7 files changed, 160 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 93690cf..cbf431e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -522,6 +522,14 @@ mesos::internal::slave::Flags::Flags()
       "qos_controller",
       "The name of the QoS Controller to use for oversubscription.");
 
+  add(&Flags::qos_correction_interval_min,
+      "qos_correction_interval_min",
+      "The slave polls and carries out QoS corrections from the QoS\n"
+      "Controller based on its observed performance of running tasks.\n"
+      "The smallest interval between these corrections is controlled by\n"
+      "this flag.",
+      Seconds(0));
+
   add(&Flags::oversubscribed_resources_interval,
       "oversubscribed_resources_interval",
       "The slave periodically updates the master with the current estimation\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6c24e56..7634e36 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -109,6 +109,7 @@ public:
   Option<std::string> hooks;
   Option<std::string> resource_estimator;
   Option<std::string> qos_controller;
+  Duration qos_correction_interval_min;
   Duration oversubscribed_resources_interval;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3614330..19f9013 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -89,6 +89,7 @@
 #include "slave/status_update_manager.hpp"
 
 using mesos::slave::QoSController;
+using mesos::slave::QoSCorrection;
 using mesos::slave::ResourceEstimator;
 
 using std::list;
@@ -3979,8 +3980,7 @@ void Slave::__recover(const Future<Nothing>& future)
     forwardOversubscribed();
 
     // Start acting on correction from QoS Controller.
-    qosController->corrections()
-      .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+    qosCorrections();
   } else {
     // Slave started in cleanup mode.
     CHECK_EQ("cleanup", flags.recover);
@@ -4127,20 +4127,127 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
-void Slave::qosCorrections(
-    const Future<list<mesos::slave::QoSCorrection>>& future)
+void Slave::qosCorrections()
 {
+  qosController->corrections()
+    .onAny(defer(self(), &Self::_qosCorrections, lambda::_1));
+}
+
+
+void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
+{
+  // Make sure correction handler is scheduled again.
+  delay(flags.qos_correction_interval_min,
+        self(),
+        &Self::qosCorrections);
+
+  // Verify slave state.
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING || state == TERMINATING) {
+    LOG(WARNING) << "Cannot perform QoS corrections because the slave is "
+                 << state;
+    return;
+  }
+
   if (!future.isReady()) {
     LOG(WARNING) << "Failed to get corrections from QoS Controller: "
                   << (future.isFailed() ? future.failure() : "discarded");
-  } else {
+    return;
+  }
+
+  const list<QoSCorrection>& corrections = future.get();
+
+  LOG(INFO) << "Received " << corrections.size() << " QoS corrections";
+
+  foreach (const QoSCorrection& correction, corrections) {
     // TODO(nnielsen): Print correction, once the operator overload
     // for QoSCorrection has been implemented.
-    LOG(INFO) << "Received new QoS corrections";
-  }
+    if (correction.type() == QoSCorrection::KILL) {
+      const QoSCorrection::Kill& kill = correction.kill();
 
-  qosController->corrections()
-    .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+      if (!kill.has_framework_id()) {
+        LOG(WARNING) << "Ignoring QoS correction KILL: "
+                     << "framework id not specified.";
+        continue;
+      }
+
+      const FrameworkID& frameworkId = kill.framework_id();
+
+      if (!kill.has_executor_id()) {
+        // TODO(nnielsen): For now, only executor killing is supported. Check
+        // can be removed when task killing is supported as well.
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": executor id not specified";
+        continue;
+      }
+
+      const ExecutorID& executorId = kill.executor_id();
+
+      Framework* framework = getFramework(frameworkId);
+      if (framework == NULL) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": framework cannot be found";
+        continue;
+      }
+
+      // Verify framework state.
+      CHECK(framework->state == Framework::RUNNING ||
+            framework->state == Framework::TERMINATING)
+        << framework->state;
+
+      if (framework->state == Framework::TERMINATING) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": framework is terminating.";
+        continue;
+      }
+
+      Executor* executor = framework->getExecutor(executorId);
+      if (executor == NULL) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+                     << executorId << "' of framework " << frameworkId
+                     << ": executor cannot be found";
+        continue;
+      }
+
+      switch (executor->state) {
+        case Executor::REGISTERING:
+        case Executor::RUNNING: {
+          LOG(INFO) << "Killing executor '" << executorId
+                    << "' of framework " << frameworkId
+                    << " as QoS correction";
+
+          // TODO(nnielsen): We should ensure that we are addressing
+          // the _container_ which the QoS controller intended to
+          // kill. Without this check, we may run into a scenario
+          // where the executor has terminated and one with the same
+          // id has started in the interim i.e. running in a different
+          // container than the one the QoS controller targeted
+          // (MESOS-2875).
+          executor->state = Executor::TERMINATING;
+          executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED;
+          containerizer->destroy(executor->containerId);
+          break;
+        }
+        case Executor::TERMINATING:
+        case Executor::TERMINATED:
+          LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+                       << executorId << "' of framework " << frameworkId
+                       << ": executor is " << executor->state;
+          break;
+        default:
+          LOG(FATAL) << " Executor '" << executor->id
+                     << "' of framework " << framework->id()
+                     << " is in unexpected state " << executor->state;
+          break;
+      }
+    } else {
+      LOG(WARNING) << "QoS correction type " << correction.type()
+                   << " is not supported";
+    }
+  }
 }
 
 
@@ -4305,7 +4412,18 @@ void Slave::sendExecutorTerminatedStatusUpdate(
   mesos::TaskState taskState = TASK_LOST;
   TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
 
-  if (termination.isReady() && termination.get().killed()) {
+  CHECK_NOTNULL(executor);
+
+  if (executor->reason.isSome()) {
+    // TODO(nnielsen): We want to dispatch the task status and reason
+    // from the termination reason (MESOS-2035) and the executor
+    // reason based on a specific policy i.e. if the termination
+    // reason is set, this overrides executor->reason. At the moment,
+    // we infer the containerizer reason for killing from 'killed'
+    // field in 'termination' and are explicitly overriding the task
+    // status and reason.
+    reason = executor->reason.get();
+  } else if (termination.isReady() && termination.get().killed()) {
     taskState = TASK_FAILED;
     // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
     reason = TaskStatus::REASON_MEMORY_LIMIT;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbed46d..f1cf3b8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -354,7 +354,10 @@ public:
   void signaled(int signal, int uid);
 
   // Made 'virtual' for Slave mocking.
-  virtual void qosCorrections(
+  virtual void qosCorrections();
+
+  // Made 'virtual' for Slave mocking.
+  virtual void _qosCorrections(
       const process::Future<std::list<
           mesos::slave::QoSCorrection>>& correction);
 
@@ -605,6 +608,11 @@ struct Executor
   // attempts to do some memset's which are unsafe).
   boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
 
+  // The 'reason' is for the slave to encode the reason behind a
+  // terminal status update for those pending/unterminated tasks when
+  // the executor is terminated.
+  Option<TaskStatus::Reason> reason;
+
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 509f9f2..dbf8c7c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -455,6 +455,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
   EXPECT_CALL(*this, __recover(_))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
+  EXPECT_CALL(*this, qosCorrections())
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections));
 }
 
 
@@ -506,6 +508,12 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 }
 
 
+void MockSlave::unmocked_qosCorrections()
+{
+  slave::Slave::qosCorrections();
+}
+
+
 MockFetcherProcess::MockFetcherProcess()
 {
   // Set up default behaviors, calling the original methods.

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ecdf910..2a96618 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -849,7 +849,11 @@ public:
   void unmocked___recover(
       const process::Future<Nothing>& future);
 
-  MOCK_METHOD1(qosCorrections, void(
+  MOCK_METHOD0(qosCorrections, void());
+
+  void unmocked_qosCorrections();
+
+  MOCK_METHOD1(_qosCorrections, void(
       const process::Future<std::list<
           mesos::slave::QoSCorrection>>& correction));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3481ad2..5c6bed7 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -764,7 +764,7 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
 
   Future<list<QoSCorrection>> qosCorrections;
-  EXPECT_CALL(slave, qosCorrections(_))
+  EXPECT_CALL(slave, _qosCorrections(_))
     .WillOnce(FutureArg<0>(&qosCorrections));
 
   spawn(slave);