You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:53 UTC
[3/8] mesos git commit: Introduced explicit status update
acknowledgements on the driver.
Introduced explicit status update acknowledgements on the driver.
Review: https://reviews.apache.org/r/30971
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff4397a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff4397a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff4397a6
Branch: refs/heads/master
Commit: ff4397a6c278d32b3523ed9cae8d43b8a7772278
Parents: 1026991
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:25:51 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800
----------------------------------------------------------------------
include/mesos/mesos.proto | 10 ++
include/mesos/scheduler.hpp | 44 ++++++++-
src/sched/sched.cpp | 192 +++++++++++++++++++++++++++++++++------
src/scheduler/scheduler.cpp | 2 +
4 files changed, 217 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 507845c..14ff7f9 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -784,6 +784,16 @@ message TaskStatus {
optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.
optional double timestamp = 6;
+ // Statuses that are delivered reliably to the scheduler will
+ // include a 'uuid'. The status is considered delivered once
+ // it is acknowledged by the scheduler. Schedulers can choose
+ // to either explicitly acknowledge statuses or let the scheduler
+ // driver implicitly acknowledge (default).
+ //
+ // TODO(bmahler): This is currently overwritten in the scheduler
+ // driver, even if executors set this.
+ optional bytes uuid = 11;
+
// Describes whether the task has been determined to be healthy
// (true) or unhealthy (false) according to the HealthCheck field in
// the command info.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 31256c1..f24ec80 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -123,12 +123,15 @@ public:
// Invoked when the status of a task has changed (e.g., a slave is
// lost and so the task is lost, a task finishes and an executor
- // sends a status update saying so, etc). Note that returning from
- // this callback _acknowledges_ receipt of this status update! If
- // for whatever reason the scheduler aborts during this callback (or
+ // sends a status update saying so, etc). If implicit
+ // acknowledgements are being used, then returning from this
+ // callback _acknowledges_ receipt of this status update! If for
+ // whatever reason the scheduler aborts during this callback (or
// the process exits) another status update will be delivered (note,
// however, that this is currently not true if the slave sending the
- // status update is lost/fails during that time).
+ // status update is lost/fails during that time). If explicit
+ // acknowledgements are in use, the scheduler must acknowledge this
+ // status on the driver.
virtual void statusUpdate(
SchedulerDriver* driver,
const TaskStatus& status) = 0;
@@ -269,6 +272,14 @@ public:
// those filtered slaves.
virtual Status reviveOffers() = 0;
+ // Acknowledges the status update. This should only be called
+ // once the status update is processed durably by the scheduler.
+ // Not that explicit acknowledgements must be requested via the
+ // constructor argument, otherwise a call to this method will
+ // cause the driver to crash.
+ virtual Status acknowledgeStatusUpdate(
+ const TaskStatus& status) = 0;
+
// Sends a message from the framework to one of its executors. These
// messages are best effort; do not expect a framework message to be
// retransmitted in any reliable fashion.
@@ -348,6 +359,26 @@ public:
const std::string& master,
const Credential& credential);
+ // These constructors are the same as the above two, but allow
+ // the framework to specify whether implicit or explicit
+ // acknowledgements are desired. See statusUpdate() for the
+ // details about explicit acknowledgements.
+ //
+ // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
+ // these new constructors are exposed.
+ MesosSchedulerDriver(
+ Scheduler* scheduler,
+ const FrameworkInfo& framework,
+ const std::string& master,
+ bool implicitAcknowledgements);
+
+ MesosSchedulerDriver(
+ Scheduler* scheduler,
+ const FrameworkInfo& framework,
+ const std::string& master,
+ bool implicitAcknowlegements,
+ const Credential& credential);
+
// This destructor will block indefinitely if
// MesosSchedulerDriver::start was invoked successfully (possibly
// via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
@@ -389,6 +420,9 @@ public:
virtual Status reviveOffers();
+ virtual Status acknowledgeStatusUpdate(
+ const TaskStatus& status);
+
virtual Status sendFrameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
@@ -423,6 +457,8 @@ private:
// Current status of the driver.
Status status;
+ const bool implicitAcknowlegements;
+
const Credential* credential;
// Scheduler process ID.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index ea7e447..280eaeb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -54,6 +54,7 @@
#include <process/metrics/gauge.hpp>
#include <process/metrics/metrics.hpp>
+#include <stout/abort.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
@@ -115,6 +116,7 @@ public:
Scheduler* _scheduler,
const FrameworkInfo& _framework,
const Option<Credential>& _credential,
+ bool _implicitAcknowledgements,
const string& schedulerId,
MasterDetector* _detector,
const scheduler::Flags& _flags,
@@ -142,6 +144,7 @@ public:
running(true),
detector(_detector),
flags(_flags),
+ implicitAcknowledgements(_implicitAcknowledgements),
credential(_credential),
authenticatee(NULL),
authenticating(None()),
@@ -647,8 +650,6 @@ protected:
const StatusUpdate& update,
const UPID& pid)
{
- const TaskStatus& status = update.status();
-
if (!running) {
VLOG(1) << "Ignoring task status update message because "
<< "the driver is not running!";
@@ -686,6 +687,23 @@ protected:
// multiple times (of course, if a scheduler re-uses a TaskID,
// that could be bad.
+ TaskStatus status = update.status();
+
+ // If the update is driver-generated or master-generated, it
+ // does not require acknowledgement and so we unset the 'uuid'
+ // field of TaskStatus. Otherwise, we overwrite the field to
+ // ensure that a 0.22.0 scheduler driver supports explicit
+ // acknowledgements, even if running against a 0.21.0 cluster.
+ //
+ // TODO(bmahler): Update the slave / executor driver to ensure
+ // that 'uuid' is set accurately by the time it reaches the
+ // scheduler driver. This will be required for pure bindings.
+ if (from == UPID() || pid == UPID()) {
+ status.clear_uuid();
+ } else {
+ status.set_uuid(update.uuid());
+ }
+
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
@@ -695,30 +713,32 @@ protected:
VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
- // Note that we need to look at the volatile 'aborted' here to
- // so that we don't acknowledge the update if the driver was
- // aborted during the processing of the update.
- if (!running) {
- VLOG(1) << "Not sending status update acknowledgment message because "
- << "the driver is not running!";
- return;
- }
-
- // Don't acknowledge updates created by the driver or master.
- if (from != UPID() && pid != UPID()) {
- // We drop updates while we're disconnected.
- CHECK(connected);
- CHECK_SOME(master);
-
- VLOG(2) << "Sending ACK for status update " << update
- << " to " << master.get();
+ if (implicitAcknowledgements) {
+ // Note that we need to look at the volatile 'running' here
+ // so that we don't acknowledge the update if the driver was
+ // aborted during the processing of the update.
+ if (!running) {
+ VLOG(1) << "Not sending status update acknowledgment message because "
+ << "the driver is not running!";
+ return;
+ }
- StatusUpdateAcknowledgementMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_slave_id()->MergeFrom(update.slave_id());
- message.mutable_task_id()->MergeFrom(update.status().task_id());
- message.set_uuid(update.uuid());
- send(master.get(), message);
+ // Don't acknowledge updates created by the driver or master.
+ if (from != UPID() && pid != UPID()) {
+ // We drop updates while we're disconnected.
+ CHECK(connected);
+ CHECK_SOME(master);
+
+ VLOG(2) << "Sending ACK for status update " << update
+ << " to " << master.get();
+
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->MergeFrom(framework.id());
+ message.mutable_slave_id()->MergeFrom(update.slave_id());
+ message.mutable_task_id()->MergeFrom(update.status().task_id());
+ message.set_uuid(update.uuid());
+ send(master.get(), message);
+ }
}
}
@@ -1072,6 +1092,53 @@ protected:
send(master.get(), message);
}
+ void acknowledgeStatusUpdate(
+ const TaskStatus& status)
+ {
+ // The driver should abort before allowing an acknowledgement
+ // call when implicit acknowledgements are enabled. We further
+ // enforce that the driver is denying the call through this CHECK.
+ CHECK(!implicitAcknowledgements);
+
+ if (!connected) {
+ VLOG(1) << "Ignoring explicit status update acknowledgement"
+ " because the driver is disconnected";
+ return;
+ }
+
+ CHECK_SOME(master);
+
+ // NOTE: By ignoring the volatile 'running' here, we ensure that
+ // all acknowledgements requested before the driver was stopped
+ // or aborted are processed. Any acknowledgement that is requested
+ // after the driver stops or aborts (running == false) will be
+ // dropped in the driver before reaching here.
+
+ // Only statuses with a 'uuid' and a 'slave_id' need to have
+ // acknowledgements sent to the master. Note that the driver
+ // ensures that master-generated and driver-generated updates
+ // will not have a 'uuid' set.
+ if (status.has_uuid() && status.has_slave_id()) {
+ VLOG(2) << "Sending ACK for status update " << status.uuid()
+ << " of task " << status.task_id()
+ << " on slave " << status.slave_id()
+ << " to " << master.get();
+
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->CopyFrom(framework.id());
+ message.mutable_slave_id()->CopyFrom(status.slave_id());
+ message.mutable_task_id()->CopyFrom(status.task_id());
+ message.set_uuid(status.uuid());
+ send(master.get(), message);
+ } else {
+ VLOG(2) << "Received ACK for status update"
+ << (status.has_uuid() ? " " + status.uuid() : "")
+ << " of task " << status.task_id()
+ << (status.has_slave_id()
+ ? " on slave " + stringify(status.slave_id()) : "");
+ }
+ }
+
void sendFrameworkMessage(const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data)
@@ -1204,6 +1271,13 @@ private:
hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
hashmap<SlaveID, UPID> savedSlavePids;
+ // The driver optionally provides implicit acknowledgements
+ // for frameworks. If disabled, the framework must send its
+ // own acknowledgements through the driver, when the 'uuid'
+ // of the TaskStatus is set (which also implies the 'slave_id'
+ // is set).
+ bool implicitAcknowledgements;
+
const Option<Credential> credential;
Authenticatee* authenticatee;
@@ -1319,6 +1393,45 @@ MesosSchedulerDriver::MesosSchedulerDriver(
master(_master),
process(NULL),
status(DRIVER_NOT_STARTED),
+ implicitAcknowlegements(true),
+ credential(NULL),
+ schedulerId("scheduler-" + UUID::random().toString())
+{
+ initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+ Scheduler* _scheduler,
+ const FrameworkInfo& _framework,
+ const string& _master,
+ const Credential& _credential)
+ : detector(NULL),
+ scheduler(_scheduler),
+ framework(_framework),
+ master(_master),
+ process(NULL),
+ status(DRIVER_NOT_STARTED),
+ implicitAcknowlegements(true),
+ credential(new Credential(_credential)),
+ schedulerId("scheduler-" + UUID::random().toString())
+{
+ initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+ Scheduler* _scheduler,
+ const FrameworkInfo& _framework,
+ const string& _master,
+ bool _implicitAcknowlegements)
+ : detector(NULL),
+ scheduler(_scheduler),
+ framework(_framework),
+ master(_master),
+ process(NULL),
+ status(DRIVER_NOT_STARTED),
+ implicitAcknowlegements(_implicitAcknowlegements),
credential(NULL),
schedulerId("scheduler-" + UUID::random().toString())
{
@@ -1326,12 +1439,11 @@ MesosSchedulerDriver::MesosSchedulerDriver(
}
-// The implementation of this is same as the above constructor
-// except that the SchedulerProcess is passed the credential.
MesosSchedulerDriver::MesosSchedulerDriver(
Scheduler* _scheduler,
const FrameworkInfo& _framework,
const string& _master,
+ bool _implicitAcknowlegements,
const Credential& _credential)
: detector(NULL),
scheduler(_scheduler),
@@ -1339,6 +1451,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
master(_master),
process(NULL),
status(DRIVER_NOT_STARTED),
+ implicitAcknowlegements(_implicitAcknowlegements),
credential(new Credential(_credential)),
schedulerId("scheduler-" + UUID::random().toString())
{
@@ -1438,6 +1551,7 @@ Status MesosSchedulerDriver::start()
scheduler,
framework,
None(),
+ implicitAcknowlegements,
schedulerId,
detector,
flags,
@@ -1450,6 +1564,7 @@ Status MesosSchedulerDriver::start()
scheduler,
framework,
cred,
+ implicitAcknowlegements,
schedulerId,
detector,
flags,
@@ -1644,6 +1759,29 @@ Status MesosSchedulerDriver::reviveOffers()
}
+Status MesosSchedulerDriver::acknowledgeStatusUpdate(
+ const TaskStatus& taskStatus)
+{
+ Lock lock(&mutex);
+
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
+
+ // TODO(bmahler): Should this use abort() instead?
+ if (implicitAcknowlegements) {
+ ABORT("Cannot call acknowledgeStatusUpdate:"
+ " Implicit acknowledgements are enabled");
+ }
+
+ CHECK(process != NULL);
+
+ dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+
+ return status;
+}
+
+
Status MesosSchedulerDriver::sendFrameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 5816569..23658c8 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -689,6 +689,7 @@ protected:
update->mutable_status()->set_timestamp(message.update().timestamp());
update->set_uuid(message.update().uuid());
+ update->mutable_status()->set_uuid(message.update().uuid());
receive(from, event);
}
@@ -761,6 +762,7 @@ protected:
status->set_timestamp(Clock::now().secs());
update->set_uuid(UUID::random().toBytes());
+ status->set_uuid(update->uuid());
receive(None(), event);
}