You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/06/17 02:43:50 UTC

[1/3] mesos git commit: Added REASON_EXECUTOR_PREEMPTED as status reason.

Repository: mesos
Updated Branches:
  refs/heads/master fa0d564a9 -> 02160a1ce


Added REASON_EXECUTOR_PREEMPTED as status reason.

Review: https://reviews.apache.org/r/34719


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a7d815c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a7d815c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a7d815c

Branch: refs/heads/master
Commit: 1a7d815cdf4fb959d0b30782dc3024000e44fba8
Parents: fa0d564
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:26 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:26 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1a7d815c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index e5b9884..8df1211 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -883,6 +883,7 @@ message TaskStatus {
   // (e.g. unhealthy vs. unregistered for maintenance).
   enum Reason {
     REASON_COMMAND_EXECUTOR_FAILED = 0;
+    REASON_EXECUTOR_PREEMPTED = 17;
     REASON_EXECUTOR_TERMINATED = 1;
     REASON_EXECUTOR_UNREGISTERED = 2;
     REASON_FRAMEWORK_REMOVED = 3;


[2/3] mesos git commit: Added kill executor correction to slave.

Posted by nn...@apache.org.
Added kill executor correction to slave.

Review: https://reviews.apache.org/r/34720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbbf840
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbbf840
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbbf840

Branch: refs/heads/master
Commit: 8cbbf84068e02cfb5899de8255ac6227713cf7e0
Parents: 1a7d815
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:35 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:37 2015 -0700

----------------------------------------------------------------------
 src/slave/flags.cpp                  |   8 ++
 src/slave/flags.hpp                  |   1 +
 src/slave/slave.cpp                  | 138 +++++++++++++++++++++++++++---
 src/slave/slave.hpp                  |  10 ++-
 src/tests/mesos.cpp                  |   8 ++
 src/tests/mesos.hpp                  |   6 +-
 src/tests/oversubscription_tests.cpp |   2 +-
 7 files changed, 160 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 93690cf..cbf431e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -522,6 +522,14 @@ mesos::internal::slave::Flags::Flags()
       "qos_controller",
       "The name of the QoS Controller to use for oversubscription.");
 
+  add(&Flags::qos_correction_interval_min,
+      "qos_correction_interval_min",
+      "The slave polls and carries out QoS corrections from the QoS\n"
+      "Controller based on its observed performance of running tasks.\n"
+      "The smallest interval between these corrections is controlled by\n"
+      "this flag.",
+      Seconds(0));
+
   add(&Flags::oversubscribed_resources_interval,
       "oversubscribed_resources_interval",
       "The slave periodically updates the master with the current estimation\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6c24e56..7634e36 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -109,6 +109,7 @@ public:
   Option<std::string> hooks;
   Option<std::string> resource_estimator;
   Option<std::string> qos_controller;
+  Duration qos_correction_interval_min;
   Duration oversubscribed_resources_interval;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3614330..19f9013 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -89,6 +89,7 @@
 #include "slave/status_update_manager.hpp"
 
 using mesos::slave::QoSController;
+using mesos::slave::QoSCorrection;
 using mesos::slave::ResourceEstimator;
 
 using std::list;
@@ -3979,8 +3980,7 @@ void Slave::__recover(const Future<Nothing>& future)
     forwardOversubscribed();
 
     // Start acting on correction from QoS Controller.
-    qosController->corrections()
-      .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+    qosCorrections();
   } else {
     // Slave started in cleanup mode.
     CHECK_EQ("cleanup", flags.recover);
@@ -4127,20 +4127,127 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
-void Slave::qosCorrections(
-    const Future<list<mesos::slave::QoSCorrection>>& future)
+void Slave::qosCorrections()
 {
+  qosController->corrections()
+    .onAny(defer(self(), &Self::_qosCorrections, lambda::_1));
+}
+
+
+void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
+{
+  // Make sure correction handler is scheduled again.
+  delay(flags.qos_correction_interval_min,
+        self(),
+        &Self::qosCorrections);
+
+  // Verify slave state.
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING || state == TERMINATING) {
+    LOG(WARNING) << "Cannot perform QoS corrections because the slave is "
+                 << state;
+    return;
+  }
+
   if (!future.isReady()) {
     LOG(WARNING) << "Failed to get corrections from QoS Controller: "
                   << (future.isFailed() ? future.failure() : "discarded");
-  } else {
+    return;
+  }
+
+  const list<QoSCorrection>& corrections = future.get();
+
+  LOG(INFO) << "Received " << corrections.size() << " QoS corrections";
+
+  foreach (const QoSCorrection& correction, corrections) {
     // TODO(nnielsen): Print correction, once the operator overload
     // for QoSCorrection has been implemented.
-    LOG(INFO) << "Received new QoS corrections";
-  }
+    if (correction.type() == QoSCorrection::KILL) {
+      const QoSCorrection::Kill& kill = correction.kill();
 
-  qosController->corrections()
-    .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+      if (!kill.has_framework_id()) {
+        LOG(WARNING) << "Ignoring QoS correction KILL: "
+                     << "framework id not specified.";
+        continue;
+      }
+
+      const FrameworkID& frameworkId = kill.framework_id();
+
+      if (!kill.has_executor_id()) {
+        // TODO(nnielsen): For now, only executor killing is supported. Check
+        // can be removed when task killing is supported as well.
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": executor id not specified";
+        continue;
+      }
+
+      const ExecutorID& executorId = kill.executor_id();
+
+      Framework* framework = getFramework(frameworkId);
+      if (framework == NULL) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": framework cannot be found";
+        continue;
+      }
+
+      // Verify framework state.
+      CHECK(framework->state == Framework::RUNNING ||
+            framework->state == Framework::TERMINATING)
+        << framework->state;
+
+      if (framework->state == Framework::TERMINATING) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+                     << frameworkId << ": framework is terminating.";
+        continue;
+      }
+
+      Executor* executor = framework->getExecutor(executorId);
+      if (executor == NULL) {
+        LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+                     << executorId << "' of framework " << frameworkId
+                     << ": executor cannot be found";
+        continue;
+      }
+
+      switch (executor->state) {
+        case Executor::REGISTERING:
+        case Executor::RUNNING: {
+          LOG(INFO) << "Killing executor '" << executorId
+                    << "' of framework " << frameworkId
+                    << " as QoS correction";
+
+          // TODO(nnielsen): We should ensure that we are addressing
+          // the _container_ which the QoS controller intended to
+          // kill. Without this check, we may run into a scenario
+          // where the executor has terminated and one with the same
+          // id has started in the interim i.e. running in a different
+          // container than the one the QoS controller targeted
+          // (MESOS-2875).
+          executor->state = Executor::TERMINATING;
+          executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED;
+          containerizer->destroy(executor->containerId);
+          break;
+        }
+        case Executor::TERMINATING:
+        case Executor::TERMINATED:
+          LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+                       << executorId << "' of framework " << frameworkId
+                       << ": executor is " << executor->state;
+          break;
+        default:
+          LOG(FATAL) << " Executor '" << executor->id
+                     << "' of framework " << framework->id()
+                     << " is in unexpected state " << executor->state;
+          break;
+      }
+    } else {
+      LOG(WARNING) << "QoS correction type " << correction.type()
+                   << " is not supported";
+    }
+  }
 }
 
 
@@ -4305,7 +4412,18 @@ void Slave::sendExecutorTerminatedStatusUpdate(
   mesos::TaskState taskState = TASK_LOST;
   TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
 
-  if (termination.isReady() && termination.get().killed()) {
+  CHECK_NOTNULL(executor);
+
+  if (executor->reason.isSome()) {
+    // TODO(nnielsen): We want to dispatch the task status and reason
+    // from the termination reason (MESOS-2035) and the executor
+    // reason based on a specific policy i.e. if the termination
+    // reason is set, this overrides executor->reason. At the moment,
+    // we infer the containerizer reason for killing from 'killed'
+    // field in 'termination' and are explicitly overriding the task
+    // status and reason.
+    reason = executor->reason.get();
+  } else if (termination.isReady() && termination.get().killed()) {
     taskState = TASK_FAILED;
     // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
     reason = TaskStatus::REASON_MEMORY_LIMIT;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbed46d..f1cf3b8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -354,7 +354,10 @@ public:
   void signaled(int signal, int uid);
 
   // Made 'virtual' for Slave mocking.
-  virtual void qosCorrections(
+  virtual void qosCorrections();
+
+  // Made 'virtual' for Slave mocking.
+  virtual void _qosCorrections(
       const process::Future<std::list<
           mesos::slave::QoSCorrection>>& correction);
 
@@ -605,6 +608,11 @@ struct Executor
   // attempts to do some memset's which are unsafe).
   boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
 
+  // The 'reason' is for the slave to encode the reason behind a
+  // terminal status update for those pending/unterminated tasks when
+  // the executor is terminated.
+  Option<TaskStatus::Reason> reason;
+
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 509f9f2..dbf8c7c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -455,6 +455,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
   EXPECT_CALL(*this, __recover(_))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
+  EXPECT_CALL(*this, qosCorrections())
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections));
 }
 
 
@@ -506,6 +508,12 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 }
 
 
+void MockSlave::unmocked_qosCorrections()
+{
+  slave::Slave::qosCorrections();
+}
+
+
 MockFetcherProcess::MockFetcherProcess()
 {
   // Set up default behaviors, calling the original methods.

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ecdf910..2a96618 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -849,7 +849,11 @@ public:
   void unmocked___recover(
       const process::Future<Nothing>& future);
 
-  MOCK_METHOD1(qosCorrections, void(
+  MOCK_METHOD0(qosCorrections, void());
+
+  void unmocked_qosCorrections();
+
+  MOCK_METHOD1(_qosCorrections, void(
       const process::Future<std::list<
           mesos::slave::QoSCorrection>>& correction));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3481ad2..5c6bed7 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -764,7 +764,7 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
 
   Future<list<QoSCorrection>> qosCorrections;
-  EXPECT_CALL(slave, qosCorrections(_))
+  EXPECT_CALL(slave, _qosCorrections(_))
     .WillOnce(FutureArg<0>(&qosCorrections));
 
   spawn(slave);


[3/3] mesos git commit: Added QoS kill executor correction test.

Posted by nn...@apache.org.
Added QoS kill executor correction test.

Review: https://reviews.apache.org/r/34721


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02160a1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02160a1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02160a1c

Branch: refs/heads/master
Commit: 02160a1ceb7d54b851b623e669e6a648be5471c1
Parents: 8cbbf84
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:46 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:47 2015 -0700

----------------------------------------------------------------------
 src/tests/mesos.cpp                  | 15 ++++++
 src/tests/mesos.hpp                  |  5 ++
 src/tests/oversubscription_tests.cpp | 89 +++++++++++++++++++++++++++++--
 3 files changed, 104 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index dbf8c7c..2cd2435 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -374,6 +374,21 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 
 
 Try<PID<slave::Slave>> MesosTest::StartSlave(
+    mesos::slave::QoSController* qoSController,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
+      None(),
+      None(),
+      None(),
+      None(),
+      qoSController);
+}
+
+
+Try<PID<slave::Slave>> MesosTest::StartSlave(
     slave::Containerizer* containerizer,
     mesos::slave::QoSController* qoSController,
     const Option<slave::Flags>& flags)

http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 2a96618..9157ac0 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -199,6 +199,11 @@ protected:
       mesos::slave::ResourceEstimator* resourceEstimator,
       const Option<slave::Flags>& flags = None());
 
+  // Starts a slave with the specified QoS Controller and flags.
+  virtual Try<process::PID<slave::Slave>> StartSlave(
+      mesos::slave::QoSController* qosController,
+      const Option<slave::Flags>& flags = None());
+
   // Starts a slave with the specified QoS Controller,
   // containerizer and flags.
   virtual Try<process::PID<slave::Slave>> StartSlave(

http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 5c6bed7..c7a2dac 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -68,7 +68,7 @@ using testing::_;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
-using testing::Invoke;
+using testing::InvokeWithoutArgs;
 using testing::Return;
 
 namespace mesos {
@@ -263,7 +263,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
 
   Queue<Resources> estimations;
   EXPECT_CALL(resourceEstimator, oversubscribable())
-    .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
+    .WillOnce(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();
   Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
@@ -327,7 +327,7 @@ TEST_F(OversubscriptionTest, RevocableOffer)
 
   Queue<Resources> estimations;
   EXPECT_CALL(resourceEstimator, oversubscribable())
-    .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
+    .WillOnce(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();
 
@@ -421,7 +421,7 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
   // We expect 2 calls for 2 estimations.
   EXPECT_CALL(resourceEstimator, oversubscribable())
     .Times(2)
-    .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+    .WillRepeatedly(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();
 
@@ -759,7 +759,9 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
   Queue<list<QoSCorrection>> corrections;
 
   EXPECT_CALL(controller, corrections())
-    .WillRepeatedly(Invoke(&corrections, &Queue<list<QoSCorrection>>::get));
+    .WillRepeatedly(InvokeWithoutArgs(
+        &corrections,
+        &Queue<list<QoSCorrection>>::get));
 
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
 
@@ -782,6 +784,83 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
   wait(slave);
 }
 
+
+// This test verifies that a QoS controller can kill a running task
+// and that a TASK_LOST with REASON_EXECUTOR_PREEMPTED is sent to the
+// framework.
+TEST_F(OversubscriptionTest, QoSCorrectionKill)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockQoSController controller;
+
+  Queue<list<mesos::slave::QoSCorrection>> corrections;
+
+  EXPECT_CALL(controller, corrections())
+    .WillRepeatedly(InvokeWithoutArgs(
+        &corrections,
+        &Queue<list<mesos::slave::QoSCorrection>>::get));
+
+  Try<PID<Slave>> slave = StartSlave(&controller, CreateSlaveFlags());
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 10");
+
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+  // Carry out kill correction.
+  QoSCorrection killCorrection;
+
+  QoSCorrection::Kill* kill = killCorrection.mutable_kill();
+  kill->mutable_framework_id()->CopyFrom(frameworkId.get());
+
+  // As we use a command executor to launch an actual sleep command,
+  // the executor id will be the task id.
+  kill->mutable_executor_id()->set_value(task.task_id().value());
+
+  corrections.put({killCorrection});
+
+  // Verify task status is TASK_LOST.
+  AWAIT_READY(status2);
+  ASSERT_EQ(TASK_LOST, status2.get().state());
+  ASSERT_EQ(TaskStatus::REASON_EXECUTOR_PREEMPTED, status2.get().reason());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {