You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:53 UTC

[3/8] mesos git commit: Introduced explicit status update acknowledgements on the driver.

Introduced explicit status update acknowledgements on the driver.

Review: https://reviews.apache.org/r/30971


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff4397a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff4397a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff4397a6

Branch: refs/heads/master
Commit: ff4397a6c278d32b3523ed9cae8d43b8a7772278
Parents: 1026991
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:25:51 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto   |  10 ++
 include/mesos/scheduler.hpp |  44 ++++++++-
 src/sched/sched.cpp         | 192 +++++++++++++++++++++++++++++++++------
 src/scheduler/scheduler.cpp |   2 +
 4 files changed, 217 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 507845c..14ff7f9 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -784,6 +784,16 @@ message TaskStatus {
   optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.
   optional double timestamp = 6;
 
+  // Statuses that are delivered reliably to the scheduler will
+  // include a 'uuid'. The status is considered delivered once
+  // it is acknowledged by the scheduler. Schedulers can choose
+  // to either explicitly acknowledge statuses or let the scheduler
+  // driver implicitly acknowledge (default).
+  //
+  // TODO(bmahler): This is currently overwritten in the scheduler
+  // driver, even if executors set this.
+  optional bytes uuid = 11;
+
   // Describes whether the task has been determined to be healthy
   // (true) or unhealthy (false) according to the HealthCheck field in
   // the command info.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 31256c1..f24ec80 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -123,12 +123,15 @@ public:
 
   // Invoked when the status of a task has changed (e.g., a slave is
   // lost and so the task is lost, a task finishes and an executor
-  // sends a status update saying so, etc). Note that returning from
-  // this callback _acknowledges_ receipt of this status update! If
-  // for whatever reason the scheduler aborts during this callback (or
+  // sends a status update saying so, etc). If implicit
+  // acknowledgements are being used, then returning from this
+  // callback _acknowledges_ receipt of this status update! If for
+  // whatever reason the scheduler aborts during this callback (or
   // the process exits) another status update will be delivered (note,
   // however, that this is currently not true if the slave sending the
-  // status update is lost/fails during that time).
+  // status update is lost/fails during that time). If explicit
+  // acknowledgements are in use, the scheduler must acknowledge this
+  // status on the driver.
   virtual void statusUpdate(
       SchedulerDriver* driver,
       const TaskStatus& status) = 0;
@@ -269,6 +272,14 @@ public:
   // those filtered slaves.
   virtual Status reviveOffers() = 0;
 
+  // Acknowledges the status update. This should only be called
+  // once the status update is processed durably by the scheduler.
+  // Not that explicit acknowledgements must be requested via the
+  // constructor argument, otherwise a call to this method will
+  // cause the driver to crash.
+  virtual Status acknowledgeStatusUpdate(
+      const TaskStatus& status) = 0;
+
   // Sends a message from the framework to one of its executors. These
   // messages are best effort; do not expect a framework message to be
   // retransmitted in any reliable fashion.
@@ -348,6 +359,26 @@ public:
       const std::string& master,
       const Credential& credential);
 
+  // These constructors are the same as the above two, but allow
+  // the framework to specify whether implicit or explicit
+  // acknowledgements are desired. See statusUpdate() for the
+  // details about explicit acknowledgements.
+  //
+  // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
+  // these new constructors are exposed.
+  MesosSchedulerDriver(
+      Scheduler* scheduler,
+      const FrameworkInfo& framework,
+      const std::string& master,
+      bool implicitAcknowledgements);
+
+  MesosSchedulerDriver(
+      Scheduler* scheduler,
+      const FrameworkInfo& framework,
+      const std::string& master,
+      bool implicitAcknowlegements,
+      const Credential& credential);
+
   // This destructor will block indefinitely if
   // MesosSchedulerDriver::start was invoked successfully (possibly
   // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
@@ -389,6 +420,9 @@ public:
 
   virtual Status reviveOffers();
 
+  virtual Status acknowledgeStatusUpdate(
+      const TaskStatus& status);
+
   virtual Status sendFrameworkMessage(
       const ExecutorID& executorId,
       const SlaveID& slaveId,
@@ -423,6 +457,8 @@ private:
   // Current status of the driver.
   Status status;
 
+  const bool implicitAcknowlegements;
+
   const Credential* credential;
 
   // Scheduler process ID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index ea7e447..280eaeb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -54,6 +54,7 @@
 #include <process/metrics/gauge.hpp>
 #include <process/metrics/metrics.hpp>
 
+#include <stout/abort.hpp>
 #include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
@@ -115,6 +116,7 @@ public:
                    Scheduler* _scheduler,
                    const FrameworkInfo& _framework,
                    const Option<Credential>& _credential,
+                   bool _implicitAcknowledgements,
                    const string& schedulerId,
                    MasterDetector* _detector,
                    const scheduler::Flags& _flags,
@@ -142,6 +144,7 @@ public:
       running(true),
       detector(_detector),
       flags(_flags),
+      implicitAcknowledgements(_implicitAcknowledgements),
       credential(_credential),
       authenticatee(NULL),
       authenticating(None()),
@@ -647,8 +650,6 @@ protected:
       const StatusUpdate& update,
       const UPID& pid)
   {
-    const TaskStatus& status = update.status();
-
     if (!running) {
       VLOG(1) << "Ignoring task status update message because "
               << "the driver is not running!";
@@ -686,6 +687,23 @@ protected:
     // multiple times (of course, if a scheduler re-uses a TaskID,
     // that could be bad.
 
+    TaskStatus status = update.status();
+
+    // If the update is driver-generated or master-generated, it
+    // does not require acknowledgement and so we unset the 'uuid'
+    // field of TaskStatus. Otherwise, we overwrite the field to
+    // ensure that a 0.22.0 scheduler driver supports explicit
+    // acknowledgements, even if running against a 0.21.0 cluster.
+    //
+    // TODO(bmahler): Update the slave / executor driver to ensure
+    // that 'uuid' is set accurately by the time it reaches the
+    // scheduler driver. This will be required for pure bindings.
+    if (from == UPID() || pid == UPID()) {
+      status.clear_uuid();
+    } else {
+      status.set_uuid(update.uuid());
+    }
+
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
       stopwatch.start();
@@ -695,30 +713,32 @@ protected:
 
     VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
 
-    // Note that we need to look at the volatile 'aborted' here to
-    // so that we don't acknowledge the update if the driver was
-    // aborted during the processing of the update.
-    if (!running) {
-      VLOG(1) << "Not sending status update acknowledgment message because "
-              << "the driver is not running!";
-      return;
-    }
-
-    // Don't acknowledge updates created by the driver or master.
-    if (from != UPID() && pid != UPID()) {
-      // We drop updates while we're disconnected.
-      CHECK(connected);
-      CHECK_SOME(master);
-
-      VLOG(2) << "Sending ACK for status update " << update
-              << " to " << master.get();
+    if (implicitAcknowledgements) {
+      // Note that we need to look at the volatile 'running' here
+      // so that we don't acknowledge the update if the driver was
+      // aborted during the processing of the update.
+      if (!running) {
+        VLOG(1) << "Not sending status update acknowledgment message because "
+                << "the driver is not running!";
+        return;
+      }
 
-      StatusUpdateAcknowledgementMessage message;
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      message.mutable_slave_id()->MergeFrom(update.slave_id());
-      message.mutable_task_id()->MergeFrom(update.status().task_id());
-      message.set_uuid(update.uuid());
-      send(master.get(), message);
+      // Don't acknowledge updates created by the driver or master.
+      if (from != UPID() && pid != UPID()) {
+        // We drop updates while we're disconnected.
+        CHECK(connected);
+        CHECK_SOME(master);
+
+        VLOG(2) << "Sending ACK for status update " << update
+            << " to " << master.get();
+
+        StatusUpdateAcknowledgementMessage message;
+        message.mutable_framework_id()->MergeFrom(framework.id());
+        message.mutable_slave_id()->MergeFrom(update.slave_id());
+        message.mutable_task_id()->MergeFrom(update.status().task_id());
+        message.set_uuid(update.uuid());
+        send(master.get(), message);
+      }
     }
   }
 
@@ -1072,6 +1092,53 @@ protected:
     send(master.get(), message);
   }
 
+  void acknowledgeStatusUpdate(
+      const TaskStatus& status)
+  {
+    // The driver should abort before allowing an acknowledgement
+    // call when implicit acknowledgements are enabled. We further
+    // enforce that the driver is denying the call through this CHECK.
+    CHECK(!implicitAcknowledgements);
+
+    if (!connected) {
+      VLOG(1) << "Ignoring explicit status update acknowledgement"
+                 " because the driver is disconnected";
+      return;
+    }
+
+    CHECK_SOME(master);
+
+    // NOTE: By ignoring the volatile 'running' here, we ensure that
+    // all acknowledgements requested before the driver was stopped
+    // or aborted are processed. Any acknowledgement that is requested
+    // after the driver stops or aborts (running == false) will be
+    // dropped in the driver before reaching here.
+
+    // Only statuses with a 'uuid' and a 'slave_id' need to have
+    // acknowledgements sent to the master. Note that the driver
+    // ensures that master-generated and driver-generated updates
+    // will not have a 'uuid' set.
+    if (status.has_uuid() && status.has_slave_id()) {
+      VLOG(2) << "Sending ACK for status update " << status.uuid()
+              << " of task " << status.task_id()
+              << " on slave " << status.slave_id()
+              << " to " << master.get();
+
+      StatusUpdateAcknowledgementMessage message;
+      message.mutable_framework_id()->CopyFrom(framework.id());
+      message.mutable_slave_id()->CopyFrom(status.slave_id());
+      message.mutable_task_id()->CopyFrom(status.task_id());
+      message.set_uuid(status.uuid());
+      send(master.get(), message);
+    } else {
+      VLOG(2) << "Received ACK for status update"
+              << (status.has_uuid() ? " " + status.uuid() : "")
+              << " of task " << status.task_id()
+              << (status.has_slave_id()
+                  ? " on slave " + stringify(status.slave_id()) : "");
+    }
+  }
+
   void sendFrameworkMessage(const ExecutorID& executorId,
                             const SlaveID& slaveId,
                             const string& data)
@@ -1204,6 +1271,13 @@ private:
   hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
   hashmap<SlaveID, UPID> savedSlavePids;
 
+  // The driver optionally provides implicit acknowledgements
+  // for frameworks. If disabled, the framework must send its
+  // own acknowledgements through the driver, when the 'uuid'
+  // of the TaskStatus is set (which also implies the 'slave_id'
+  // is set).
+  bool implicitAcknowledgements;
+
   const Option<Credential> credential;
 
   Authenticatee* authenticatee;
@@ -1319,6 +1393,45 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     master(_master),
     process(NULL),
     status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(true),
+    credential(NULL),
+    schedulerId("scheduler-" + UUID::random().toString())
+{
+  initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+    Scheduler* _scheduler,
+    const FrameworkInfo& _framework,
+    const string& _master,
+    const Credential& _credential)
+  : detector(NULL),
+    scheduler(_scheduler),
+    framework(_framework),
+    master(_master),
+    process(NULL),
+    status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(true),
+    credential(new Credential(_credential)),
+    schedulerId("scheduler-" + UUID::random().toString())
+{
+  initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+    Scheduler* _scheduler,
+    const FrameworkInfo& _framework,
+    const string& _master,
+    bool _implicitAcknowlegements)
+  : detector(NULL),
+    scheduler(_scheduler),
+    framework(_framework),
+    master(_master),
+    process(NULL),
+    status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(_implicitAcknowlegements),
     credential(NULL),
     schedulerId("scheduler-" + UUID::random().toString())
 {
@@ -1326,12 +1439,11 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 }
 
 
-// The implementation of this is same as the above constructor
-// except that the SchedulerProcess is passed the credential.
 MesosSchedulerDriver::MesosSchedulerDriver(
     Scheduler* _scheduler,
     const FrameworkInfo& _framework,
     const string& _master,
+    bool _implicitAcknowlegements,
     const Credential& _credential)
   : detector(NULL),
     scheduler(_scheduler),
@@ -1339,6 +1451,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     master(_master),
     process(NULL),
     status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(_implicitAcknowlegements),
     credential(new Credential(_credential)),
     schedulerId("scheduler-" + UUID::random().toString())
 {
@@ -1438,6 +1551,7 @@ Status MesosSchedulerDriver::start()
         scheduler,
         framework,
         None(),
+        implicitAcknowlegements,
         schedulerId,
         detector,
         flags,
@@ -1450,6 +1564,7 @@ Status MesosSchedulerDriver::start()
         scheduler,
         framework,
         cred,
+        implicitAcknowlegements,
         schedulerId,
         detector,
         flags,
@@ -1644,6 +1759,29 @@ Status MesosSchedulerDriver::reviveOffers()
 }
 
 
+Status MesosSchedulerDriver::acknowledgeStatusUpdate(
+    const TaskStatus& taskStatus)
+{
+  Lock lock(&mutex);
+
+  if (status != DRIVER_RUNNING) {
+    return status;
+  }
+
+  // TODO(bmahler): Should this use abort() instead?
+  if (implicitAcknowlegements) {
+    ABORT("Cannot call acknowledgeStatusUpdate:"
+          " Implicit acknowledgements are enabled");
+  }
+
+  CHECK(process != NULL);
+
+  dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+
+  return status;
+}
+
+
 Status MesosSchedulerDriver::sendFrameworkMessage(
     const ExecutorID& executorId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 5816569..23658c8 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -689,6 +689,7 @@ protected:
     update->mutable_status()->set_timestamp(message.update().timestamp());
 
     update->set_uuid(message.update().uuid());
+    update->mutable_status()->set_uuid(message.update().uuid());
 
     receive(from, event);
   }
@@ -761,6 +762,7 @@ protected:
     status->set_timestamp(Clock::now().secs());
 
     update->set_uuid(UUID::random().toBytes());
+    status->set_uuid(update->uuid());
 
     receive(None(), event);
   }