You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/06/17 02:43:50 UTC
[1/3] mesos git commit: Added REASON_EXECUTOR_PREEMPTED as status
reason.
Repository: mesos
Updated Branches:
refs/heads/master fa0d564a9 -> 02160a1ce
Added REASON_EXECUTOR_PREEMPTED as status reason.
Review: https://reviews.apache.org/r/34719
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a7d815c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a7d815c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a7d815c
Branch: refs/heads/master
Commit: 1a7d815cdf4fb959d0b30782dc3024000e44fba8
Parents: fa0d564
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:26 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:26 2015 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1a7d815c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index e5b9884..8df1211 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -883,6 +883,7 @@ message TaskStatus {
// (e.g. unhealthy vs. unregistered for maintenance).
enum Reason {
REASON_COMMAND_EXECUTOR_FAILED = 0;
+ REASON_EXECUTOR_PREEMPTED = 17;
REASON_EXECUTOR_TERMINATED = 1;
REASON_EXECUTOR_UNREGISTERED = 2;
REASON_FRAMEWORK_REMOVED = 3;
[2/3] mesos git commit: Added kill executor correction to slave.
Posted by nn...@apache.org.
Added kill executor correction to slave.
Review: https://reviews.apache.org/r/34720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbbf840
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbbf840
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbbf840
Branch: refs/heads/master
Commit: 8cbbf84068e02cfb5899de8255ac6227713cf7e0
Parents: 1a7d815
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:35 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:37 2015 -0700
----------------------------------------------------------------------
src/slave/flags.cpp | 8 ++
src/slave/flags.hpp | 1 +
src/slave/slave.cpp | 138 +++++++++++++++++++++++++++---
src/slave/slave.hpp | 10 ++-
src/tests/mesos.cpp | 8 ++
src/tests/mesos.hpp | 6 +-
src/tests/oversubscription_tests.cpp | 2 +-
7 files changed, 160 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 93690cf..cbf431e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -522,6 +522,14 @@ mesos::internal::slave::Flags::Flags()
"qos_controller",
"The name of the QoS Controller to use for oversubscription.");
+ add(&Flags::qos_correction_interval_min,
+ "qos_correction_interval_min",
+ "The slave polls and carries out QoS corrections from the QoS\n"
+ "Controller based on its observed performance of running tasks.\n"
+ "The smallest interval between these corrections is controlled by\n"
+ "this flag.",
+ Seconds(0));
+
add(&Flags::oversubscribed_resources_interval,
"oversubscribed_resources_interval",
"The slave periodically updates the master with the current estimation\n"
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6c24e56..7634e36 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -109,6 +109,7 @@ public:
Option<std::string> hooks;
Option<std::string> resource_estimator;
Option<std::string> qos_controller;
+ Duration qos_correction_interval_min;
Duration oversubscribed_resources_interval;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3614330..19f9013 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -89,6 +89,7 @@
#include "slave/status_update_manager.hpp"
using mesos::slave::QoSController;
+using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
using std::list;
@@ -3979,8 +3980,7 @@ void Slave::__recover(const Future<Nothing>& future)
forwardOversubscribed();
// Start acting on correction from QoS Controller.
- qosController->corrections()
- .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+ qosCorrections();
} else {
// Slave started in cleanup mode.
CHECK_EQ("cleanup", flags.recover);
@@ -4127,20 +4127,127 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
}
-void Slave::qosCorrections(
- const Future<list<mesos::slave::QoSCorrection>>& future)
+void Slave::qosCorrections()
{
+ qosController->corrections()
+ .onAny(defer(self(), &Self::_qosCorrections, lambda::_1));
+}
+
+
+void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
+{
+ // Make sure correction handler is scheduled again.
+ delay(flags.qos_correction_interval_min,
+ self(),
+ &Self::qosCorrections);
+
+ // Verify slave state.
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING || state == TERMINATING) {
+ LOG(WARNING) << "Cannot perform QoS corrections because the slave is "
+ << state;
+ return;
+ }
+
if (!future.isReady()) {
LOG(WARNING) << "Failed to get corrections from QoS Controller: "
<< (future.isFailed() ? future.failure() : "discarded");
- } else {
+ return;
+ }
+
+ const list<QoSCorrection>& corrections = future.get();
+
+ LOG(INFO) << "Received " << corrections.size() << " QoS corrections";
+
+ foreach (const QoSCorrection& correction, corrections) {
// TODO(nnielsen): Print correction, once the operator overload
// for QoSCorrection has been implemented.
- LOG(INFO) << "Received new QoS corrections";
- }
+ if (correction.type() == QoSCorrection::KILL) {
+ const QoSCorrection::Kill& kill = correction.kill();
- qosController->corrections()
- .onAny(defer(self(), &Self::qosCorrections, lambda::_1));
+ if (!kill.has_framework_id()) {
+ LOG(WARNING) << "Ignoring QoS correction KILL: "
+ << "framework id not specified.";
+ continue;
+ }
+
+ const FrameworkID& frameworkId = kill.framework_id();
+
+ if (!kill.has_executor_id()) {
+ // TODO(nnielsen): For now, only executor killing is supported. Check
+ // can be removed when task killing is supported as well.
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": executor id not specified";
+ continue;
+ }
+
+ const ExecutorID& executorId = kill.executor_id();
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": framework cannot be found";
+ continue;
+ }
+
+ // Verify framework state.
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING)
+ << framework->state;
+
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on framework "
+ << frameworkId << ": framework is terminating.";
+ continue;
+ }
+
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+ << executorId << "' of framework " << frameworkId
+ << ": executor cannot be found";
+ continue;
+ }
+
+ switch (executor->state) {
+ case Executor::REGISTERING:
+ case Executor::RUNNING: {
+ LOG(INFO) << "Killing executor '" << executorId
+ << "' of framework " << frameworkId
+ << " as QoS correction";
+
+ // TODO(nnielsen): We should ensure that we are addressing
+ // the _container_ which the QoS controller intended to
+ // kill. Without this check, we may run into a scenario
+ // where the executor has terminated and one with the same
+ // id has started in the interim i.e. running in a different
+ // container than the one the QoS controller targeted
+ // (MESOS-2875).
+ executor->state = Executor::TERMINATING;
+ executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED;
+ containerizer->destroy(executor->containerId);
+ break;
+ }
+ case Executor::TERMINATING:
+ case Executor::TERMINATED:
+ LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
+ << executorId << "' of framework " << frameworkId
+ << ": executor is " << executor->state;
+ break;
+ default:
+ LOG(FATAL) << " Executor '" << executor->id
+ << "' of framework " << framework->id()
+ << " is in unexpected state " << executor->state;
+ break;
+ }
+ } else {
+ LOG(WARNING) << "QoS correction type " << correction.type()
+ << " is not supported";
+ }
+ }
}
@@ -4305,7 +4412,18 @@ void Slave::sendExecutorTerminatedStatusUpdate(
mesos::TaskState taskState = TASK_LOST;
TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
- if (termination.isReady() && termination.get().killed()) {
+ CHECK_NOTNULL(executor);
+
+ if (executor->reason.isSome()) {
+ // TODO(nnielsen): We want to dispatch the task status and reason
+ // from the termination reason (MESOS-2035) and the executor
+ // reason based on a specific policy i.e. if the termination
+ // reason is set, this overrides executor->reason. At the moment,
+ // we infer the containerizer reason for killing from 'killed'
+ // field in 'termination' and are explicitly overriding the task
+ // status and reason.
+ reason = executor->reason.get();
+ } else if (termination.isReady() && termination.get().killed()) {
taskState = TASK_FAILED;
// TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
reason = TaskStatus::REASON_MEMORY_LIMIT;
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbed46d..f1cf3b8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -354,7 +354,10 @@ public:
void signaled(int signal, int uid);
// Made 'virtual' for Slave mocking.
- virtual void qosCorrections(
+ virtual void qosCorrections();
+
+ // Made 'virtual' for Slave mocking.
+ virtual void _qosCorrections(
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction);
@@ -605,6 +608,11 @@ struct Executor
// attempts to do some memset's which are unsafe).
boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
+ // The 'reason' is for the slave to encode the reason behind a
+ // terminal status update for those pending/unterminated tasks when
+ // the executor is terminated.
+ Option<TaskStatus::Reason> reason;
+
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 509f9f2..dbf8c7c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -455,6 +455,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
.WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
EXPECT_CALL(*this, __recover(_))
.WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
+ EXPECT_CALL(*this, qosCorrections())
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections));
}
@@ -506,6 +508,12 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
}
+void MockSlave::unmocked_qosCorrections()
+{
+ slave::Slave::qosCorrections();
+}
+
+
MockFetcherProcess::MockFetcherProcess()
{
// Set up default behaviors, calling the original methods.
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ecdf910..2a96618 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -849,7 +849,11 @@ public:
void unmocked___recover(
const process::Future<Nothing>& future);
- MOCK_METHOD1(qosCorrections, void(
+ MOCK_METHOD0(qosCorrections, void());
+
+ void unmocked_qosCorrections();
+
+ MOCK_METHOD1(_qosCorrections, void(
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction));
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3481ad2..5c6bed7 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -764,7 +764,7 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
Future<list<QoSCorrection>> qosCorrections;
- EXPECT_CALL(slave, qosCorrections(_))
+ EXPECT_CALL(slave, _qosCorrections(_))
.WillOnce(FutureArg<0>(&qosCorrections));
spawn(slave);
[3/3] mesos git commit: Added QoS kill executor correction test.
Posted by nn...@apache.org.
Added QoS kill executor correction test.
Review: https://reviews.apache.org/r/34721
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02160a1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02160a1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02160a1c
Branch: refs/heads/master
Commit: 02160a1ceb7d54b851b623e669e6a648be5471c1
Parents: 8cbbf84
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Tue Jun 16 17:02:46 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Tue Jun 16 17:02:47 2015 -0700
----------------------------------------------------------------------
src/tests/mesos.cpp | 15 ++++++
src/tests/mesos.hpp | 5 ++
src/tests/oversubscription_tests.cpp | 89 +++++++++++++++++++++++++++++--
3 files changed, 104 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index dbf8c7c..2cd2435 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -374,6 +374,21 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
Try<PID<slave::Slave>> MesosTest::StartSlave(
+ mesos::slave::QoSController* qoSController,
+ const Option<slave::Flags>& flags)
+{
+ return cluster.slaves.start(
+ flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
+ None(),
+ None(),
+ None(),
+ None(),
+ qoSController);
+}
+
+
+Try<PID<slave::Slave>> MesosTest::StartSlave(
slave::Containerizer* containerizer,
mesos::slave::QoSController* qoSController,
const Option<slave::Flags>& flags)
http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 2a96618..9157ac0 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -199,6 +199,11 @@ protected:
mesos::slave::ResourceEstimator* resourceEstimator,
const Option<slave::Flags>& flags = None());
+ // Starts a slave with the specified QoS Controller and flags.
+ virtual Try<process::PID<slave::Slave>> StartSlave(
+ mesos::slave::QoSController* qosController,
+ const Option<slave::Flags>& flags = None());
+
// Starts a slave with the specified QoS Controller,
// containerizer and flags.
virtual Try<process::PID<slave::Slave>> StartSlave(
http://git-wip-us.apache.org/repos/asf/mesos/blob/02160a1c/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 5c6bed7..c7a2dac 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -68,7 +68,7 @@ using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
-using testing::Invoke;
+using testing::InvokeWithoutArgs;
using testing::Return;
namespace mesos {
@@ -263,7 +263,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
Queue<Resources> estimations;
EXPECT_CALL(resourceEstimator, oversubscribable())
- .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
+ .WillOnce(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();
Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
@@ -327,7 +327,7 @@ TEST_F(OversubscriptionTest, RevocableOffer)
Queue<Resources> estimations;
EXPECT_CALL(resourceEstimator, oversubscribable())
- .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
+ .WillOnce(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();
@@ -421,7 +421,7 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
// We expect 2 calls for 2 estimations.
EXPECT_CALL(resourceEstimator, oversubscribable())
.Times(2)
- .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+ .WillRepeatedly(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();
@@ -759,7 +759,9 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
Queue<list<QoSCorrection>> corrections;
EXPECT_CALL(controller, corrections())
- .WillRepeatedly(Invoke(&corrections, &Queue<list<QoSCorrection>>::get));
+ .WillRepeatedly(InvokeWithoutArgs(
+ &corrections,
+ &Queue<list<QoSCorrection>>::get));
MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller);
@@ -782,6 +784,83 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection)
wait(slave);
}
+
+// This test verifies that a QoS controller can kill a running task
+// and that a TASK_LOST with REASON_EXECUTOR_PREEMPTED is sent to the
+// framework.
+TEST_F(OversubscriptionTest, QoSCorrectionKill)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockQoSController controller;
+
+ Queue<list<mesos::slave::QoSCorrection>> corrections;
+
+ EXPECT_CALL(controller, corrections())
+ .WillRepeatedly(InvokeWithoutArgs(
+ &corrections,
+ &Queue<list<mesos::slave::QoSCorrection>>::get));
+
+ Try<PID<Slave>> slave = StartSlave(&controller, CreateSlaveFlags());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 10");
+
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status1);
+ ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+ // Carry out kill correction.
+ QoSCorrection killCorrection;
+
+ QoSCorrection::Kill* kill = killCorrection.mutable_kill();
+ kill->mutable_framework_id()->CopyFrom(frameworkId.get());
+
+ // As we use a command executor to launch an actual sleep command,
+ // the executor id will be the task id.
+ kill->mutable_executor_id()->set_value(task.task_id().value());
+
+ corrections.put({killCorrection});
+
+ // Verify task status is TASK_LOST.
+ AWAIT_READY(status2);
+ ASSERT_EQ(TASK_LOST, status2.get().state());
+ ASSERT_EQ(TaskStatus::REASON_EXECUTOR_PREEMPTED, status2.get().reason());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
} // namespace tests {
} // namespace internal {
} // namespace mesos {