You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/06/17 02:43:51 UTC
[2/3] mesos git commit: Added kill executor correction to slave.
Added kill executor correction to slave.
Review: https://reviews.apache.org/r/34720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbbf840
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbbf840
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbbf840
Branch: refs/heads/master
Commit: 8cbbf84068e02cfb5899de8255ac6227713cf7e0
Parents: 1a7d815
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:35 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:37 2015 -0700
----------------------------------------------------------------------
src/slave/flags.cpp | 8 ++
src/slave/flags.hpp | 1 +
src/slave/slave.cpp | 138 +++++++++++++++++++++++++++---
src/slave/slave.hpp | 10 ++-
src/tests/mesos.cpp | 8 ++
src/tests/mesos.hpp | 6 +-
src/tests/oversubscription_tests.cpp | 2 +-
7 files changed, 160 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 93690cf..cbf431e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -522,6 +522,14 @@ mesos::internal::slave::Flags::Flags()
"qos_controller",
"The name of the QoS Controller to use for oversubscription.");
+ add(&Flags::qos_correction_interval_min,
+ "qos_correction_interval_min",
+ "The slave polls and carries out QoS corrections from the QoS\n"
+ "Controller based on its observed performance of running tasks.\n"
+ "The smallest interval between these corrections is controlled by\n"
+ "this flag.",
+ Seconds(0));
+
add(&Flags::oversubscribed_resources_interval,
"oversubscribed_resources_interval",
"The slave periodically updates the master with the current estimation\n"
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6c24e56..7634e36 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -109,6 +109,7 @@ public:
Option<std::string> hooks;
Option<std::string> resource_estimator;
Option<std::string> qos_controller;
+ Duration qos_correction_interval_min;
Duration oversubscribed_resources_interval;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3614330..19f9013 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -89,6 +89,7 @@
#include "slave/status_update_manager.hpp"
using mesos::slave::QoSController;
+using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
using std::list;
@@ -3979,8 +3980,7 @@ void Slave::__recover(const Future<Nothing>& future)
forwardOversubscribed();
// Start acting on correction from QoS Controller.
- qosController->corrections()
- .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+ qosCorrections();
} else {
// Slave started in cleanup mode.
CHECK_EQ("cleanup", flags.recover);
@@ -4127,20 +4127,127 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
}
-void Slave::qosCorrections(
- const Future<list<mesos::slave::QoSCorrection>>& future)
+void Slave::qosCorrections()
{
+ qosController->corrections()
+ .onAny(defer(self(), &Self::_qosCorrections, lambda::_1));
+}
+
+
+void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
+{
+ // Make sure correction handler is scheduled again.
+ delay(flags.qos_correction_interval_min,
+ self(),
+ &Self::qosCorrections);
+
+ // Verify slave state.
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING || state == TERMINATING) {
+ LOG(WARNING) << "Cannot perform QoS corrections because the slave is "
+ << state;
+ return;
+ }
+
if (!future.isReady()) {
LOG(WARNING) << "Failed to get corrections from QoS Controller: "
<< (future.isFailed() ? future.failure() : "discarded");
- } else {
+ return;
+ }
+
+ const list<QoSCorrection>& corrections = future.get();
+
+ LOG(INFO) << "Received " << corrections.size() << " QoS corrections";
+
+ foreach (const QoSCorrection& correction, corrections) {
// TODO(nnielsen): Print correction, once the operator overload
// for QoSCorrection has been implemented.
- LOG(INFO) << "Received new QoS corrections";
- }
+ if (correction.type() == QoSCorrection::KILL) {
+ const QoSCorrection::Kill& kill = correction.kill();
- qosController->corrections()
- .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+ if (!kill.has_framework_id()) {
+ LOG(WARNING) << "Ignoring QoS correction KILL: "
+ << "framework id not specified.";
+ continue;
+ }
+
+ const FrameworkID& frameworkId = kill.framework_id();
+
+ if (!kill.has_executor_id()) {
+ // TODO(nnielsen): For now, only executor killing is supported. Check
+ // can be removed when task killing is supported as well.
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": executor id not specified";
+ continue;
+ }
+
+ const ExecutorID& executorId = kill.executor_id();
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": framework cannot be found";
+ continue;
+ }
+
+ // Verify framework state.
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING)
+ << framework->state;
+
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": framework is terminating.";
+ continue;
+ }
+
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+ << executorId << "' of framework " << frameworkId
+ << ": executor cannot be found";
+ continue;
+ }
+
+ switch (executor->state) {
+ case Executor::REGISTERING:
+ case Executor::RUNNING: {
+ LOG(INFO) << "Killing executor '" << executorId
+ << "' of framework " << frameworkId
+ << " as QoS correction";
+
+ // TODO(nnielsen): We should ensure that we are addressing
+ // the _container_ which the QoS controller intended to
+ // kill. Without this check, we may run into a scenario
+ // where the executor has terminated and one with the same
+ // id has started in the interim i.e. running in a different
+ // container than the one the QoS controller targeted
+ // (MESOS-2875).
+ executor->state = Executor::TERMINATING;
+ executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED;
+ containerizer->destroy(executor->containerId);
+ break;
+ }
+ case Executor::TERMINATING:
+ case Executor::TERMINATED:
+ LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+ << executorId << "' of framework " << frameworkId
+ << ": executor is " << executor->state;
+ break;
+ default:
+ LOG(FATAL) << " Executor '" << executor->id
+ << "' of framework " << framework->id()
+ << " is in unexpected state " << executor->state;
+ break;
+ }
+ } else {
+ LOG(WARNING) << "QoS correction type " << correction.type()
+ << " is not supported";
+ }
+ }
}
@@ -4305,7 +4412,18 @@ void Slave::sendExecutorTerminatedStatusUpdate(
mesos::TaskState taskState = TASK_LOST;
TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
- if (termination.isReady() && termination.get().killed()) {
+ CHECK_NOTNULL(executor);
+
+ if (executor->reason.isSome()) {
+ // TODO(nnielsen): We want to dispatch the task status and reason
+ // from the termination reason (MESOS-2035) and the executor
+ // reason based on a specific policy i.e. if the termination
+ // reason is set, this overrides executor->reason. At the moment,
+ // we infer the containerizer reason for killing from 'killed'
+ // field in 'termination' and are explicitly overriding the task
+ // status and reason.
+ reason = executor->reason.get();
+ } else if (termination.isReady() && termination.get().killed()) {
taskState = TASK_FAILED;
// TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
reason = TaskStatus::REASON_MEMORY_LIMIT;
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbed46d..f1cf3b8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -354,7 +354,10 @@ public:
void signaled(int signal, int uid);
// Made 'virtual' for Slave mocking.
- virtual void qosCorrections(
+ virtual void qosCorrections();
+
+ // Made 'virtual' for Slave mocking.
+ virtual void _qosCorrections(
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction);
@@ -605,6 +608,11 @@ struct Executor
// attempts to do some memset's which are unsafe).
boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
+ // The 'reason' is for the slave to encode the reason behind a
+ // terminal status update for those pending/unterminated tasks when
+ // the executor is terminated.
+ Option<TaskStatus::Reason> reason;
+
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 509f9f2..dbf8c7c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -455,6 +455,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
.WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
EXPECT_CALL(*this, __recover(_))
.WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
+ EXPECT_CALL(*this, qosCorrections())
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections));
}
@@ -506,6 +508,12 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
}
+void MockSlave::unmocked_qosCorrections()
+{
+ slave::Slave::qosCorrections();
+}
+
+
MockFetcherProcess::MockFetcherProcess()
{
// Set up default behaviors, calling the original methods.
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ecdf910..2a96618 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -849,7 +849,11 @@ public:
void unmocked___recover(
const process::Future<Nothing>& future);
- MOCK_METHOD1(qosCorrections, void(
+ MOCK_METHOD0(qosCorrections, void());
+
+ void unmocked_qosCorrections();
+
+ MOCK_METHOD1(_qosCorrections, void(
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction));
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3481ad2..5c6bed7 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -764,7 +764,7 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
Future<list<QoSCorrection>> qosCorrections;
- EXPECT_CALL(slave, qosCorrections(_))
+ EXPECT_CALL(slave, _qosCorrections(_))
.WillOnce(FutureArg<0>(&qosCorrections));
spawn(slave);