You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:51 UTC

[1/8] mesos git commit: Updated MockScheduler to pass implicit acknowledgements parameter.

Repository: mesos
Updated Branches:
  refs/heads/master 39c5e9624 -> 336997936


Updated MockScheduler to pass implicit acknowledgements parameter.

Review: https://reviews.apache.org/r/30974


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2626d69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2626d69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2626d69

Branch: refs/heads/master
Commit: a2626d693809da69e338e7cd4ad07a77ac2f21af
Parents: e4013c0
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:29:42 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 src/tests/mesos.hpp                | 34 ++++++++++++++++++++++++++-------
 src/tests/slave_recovery_tests.cpp |  2 +-
 2 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a2626d69/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 60c7004..fceef6c 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -521,28 +521,48 @@ class TestingMesosSchedulerDriver : public MesosSchedulerDriver
 public:
   TestingMesosSchedulerDriver(
       Scheduler* scheduler,
-      const FrameworkInfo& framework,
-      const Credential& credential,
       MasterDetector* _detector)
-    : MesosSchedulerDriver(scheduler, framework, "", credential)
+    : MesosSchedulerDriver(
+          scheduler,
+          DEFAULT_FRAMEWORK_INFO,
+          "",
+          true,
+          DEFAULT_CREDENTIAL)
   {
     detector = _detector;
   }
 
-  // A constructor that uses the DEFAULT_FRAMEWORK_INFO &
-  // DEFAULT_CREDENTIAL.
   TestingMesosSchedulerDriver(
       Scheduler* scheduler,
-      MasterDetector* _detector)
+      MasterDetector* _detector,
+      const FrameworkInfo& framework,
+      bool implicitAcknowledgements = true)
     : MesosSchedulerDriver(
           scheduler,
-          DEFAULT_FRAMEWORK_INFO,
+          framework,
           "",
+          implicitAcknowledgements,
           DEFAULT_CREDENTIAL)
   {
     detector = _detector;
   }
 
+  TestingMesosSchedulerDriver(
+      Scheduler* scheduler,
+      MasterDetector* _detector,
+      const FrameworkInfo& framework,
+      bool implicitAcknowledgements,
+      const Credential& credential)
+    : MesosSchedulerDriver(
+          scheduler,
+          framework,
+          "",
+          implicitAcknowledgements,
+          credential)
+  {
+    detector = _detector;
+  }
+
   ~TestingMesosSchedulerDriver()
   {
     // This is necessary because in the base class the detector is

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2626d69/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 8210c52..24c28af 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2745,7 +2745,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
   Owned<StandaloneMasterDetector> detector(
       new StandaloneMasterDetector(master.get()));
   TestingMesosSchedulerDriver driver(
-      &sched, frameworkInfo, DEFAULT_CREDENTIAL, detector.get());
+      &sched, detector.get(), frameworkInfo);
 
   EXPECT_CALL(sched, registered(_, _, _));
 


[5/8] mesos git commit: Added JNI construct for booleans.

Posted by bm...@apache.org.
Added JNI construct for booleans.

Review: https://reviews.apache.org/r/30970


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10269912
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10269912
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10269912

Branch: refs/heads/master
Commit: 102699121060a2e183df52ea0398ff4ebf45e57e
Parents: 39c5e96
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:23:04 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 src/java/jni/construct.cpp | 6 ++++++
 src/java/jni/construct.hpp | 1 +
 2 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10269912/src/java/jni/construct.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp
index 0d0207f..e54c11e 100644
--- a/src/java/jni/construct.cpp
+++ b/src/java/jni/construct.cpp
@@ -50,6 +50,12 @@ T parse(const void* data, int size)
 }
 
 
+bool construct(JNIEnv* env, jboolean jbool)
+{
+  return jbool == JNI_TRUE;
+}
+
+
 template <>
 string construct(JNIEnv* env, jobject jobj)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/10269912/src/java/jni/construct.hpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.hpp b/src/java/jni/construct.hpp
index a1907a1..f2a62ea 100644
--- a/src/java/jni/construct.hpp
+++ b/src/java/jni/construct.hpp
@@ -21,6 +21,7 @@
 
 #include <jni.h>
 
+bool construct(JNIEnv* env, jboolean jbool);
 
 template <typename T>
 T construct(JNIEnv* env, jobject jobj);


[6/8] mesos git commit: Updated test frameworks for explicit acknowledgements.

Posted by bm...@apache.org.
Updated test frameworks for explicit acknowledgements.

Review: https://reviews.apache.org/r/30976


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1aeec05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1aeec05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1aeec05

Branch: refs/heads/master
Commit: e1aeec052cf9c8cec1cf14f4c72c83e2d6b68f02
Parents: a2626d6
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:31:27 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:33 2015 -0800

----------------------------------------------------------------------
 src/examples/java/TestFramework.java  | 35 ++++++++++++++++++++-----
 src/examples/python/test_framework.py | 21 ++++++++++++---
 src/examples/test_framework.cpp       | 41 ++++++++++++++++++++++++------
 3 files changed, 78 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index ce87de8..65ba9d9 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -31,11 +31,15 @@ import org.apache.mesos.Protos.*;
 
 public class TestFramework {
   static class TestScheduler implements Scheduler {
-    public TestScheduler(ExecutorInfo executor) {
-      this(executor, 5);
+    public TestScheduler(boolean implicitAcknowledgements,
+                         ExecutorInfo executor) {
+      this(implicitAcknowledgements, executor, 5);
     }
 
-    public TestScheduler(ExecutorInfo executor, int totalTasks) {
+    public TestScheduler(boolean implicitAcknowledgements,
+                         ExecutorInfo executor,
+                         int totalTasks) {
+      this.implicitAcknowledgements = implicitAcknowledgements;
       this.executor = executor;
       this.totalTasks = totalTasks;
     }
@@ -139,6 +143,10 @@ public class TestFramework {
                            " with message '" + status.getMessage() + "'");
         driver.abort();
       }
+
+      if (!implicitAcknowledgements) {
+        driver.acknowledgeStatusUpdate(status);
+      }
     }
 
     @Override
@@ -160,6 +168,7 @@ public class TestFramework {
       System.out.println("Error: " + message);
     }
 
+    private final boolean implicitAcknowledgements;
     private final ExecutorInfo executor;
     private final int totalTasks;
     private int launchedTasks = 0;
@@ -197,9 +206,16 @@ public class TestFramework {
       frameworkBuilder.setCheckpoint(true);
     }
 
+    boolean implicitAcknowledgements = true;
+
+    if (System.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS") != null) {
+      System.out.println("Enabling explicit acknowledgements for status updates");
+      implicitAcknowledgements = false;
+    }
+
     Scheduler scheduler = args.length == 1
-        ? new TestScheduler(executor)
-        : new TestScheduler(executor, Integer.parseInt(args[1]));
+        ? new TestScheduler(implicitAcknowledgements, executor)
+        : new TestScheduler(implicitAcknowledgements, executor, Integer.parseInt(args[1]));
 
     MesosSchedulerDriver driver = null;
     if (System.getenv("MESOS_AUTHENTICATE") != null) {
@@ -222,11 +238,16 @@ public class TestFramework {
 
       frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
 
-      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential);
+      driver = new MesosSchedulerDriver(
+          scheduler,
+          frameworkBuilder.build(),
+          args[0],
+          implicitAcknowledgements,
+          credential);
     } else {
       frameworkBuilder.setPrincipal("test-framework-java");
 
-      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]);
+      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], implicitAcknowledgements);
     }
 
     int status = driver.run() == Status.DRIVER_STOPPED ? 0 : 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/python/test_framework.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index aad6d77..2710614 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -30,7 +30,8 @@ TASK_CPUS = 1
 TASK_MEM = 128
 
 class TestScheduler(mesos.interface.Scheduler):
-    def __init__(self, executor):
+    def __init__(self, implicitAcknowledgements, executor):
+        self.implicitAcknowledgements = implicitAcknowledgements
         self.executor = executor
         self.taskData = {}
         self.tasksLaunched = 0
@@ -123,6 +124,11 @@ class TestScheduler(mesos.interface.Scheduler):
                 % (update.task_id.value, mesos_pb2.TaskState.Name(update.state), update.message)
             driver.abort()
 
+        # Explicitly acknowledge the update if implicit acknowledgements
+        # are not being used.
+        if not self.implicitAcknowledgements:
+            driver.acknowledgeStatusUpdate(update)
+
     def frameworkMessage(self, driver, executorId, slaveId, message):
         self.messagesReceived += 1
 
@@ -163,6 +169,11 @@ if __name__ == "__main__":
         print "Enabling checkpoint for the framework"
         framework.checkpoint = True
 
+    implicitAcknowledgements = 1
+    if os.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS"):
+        print "Enabling explicit status update acknowledgements"
+        implicitAcknowledgements = 0
+
     if os.getenv("MESOS_AUTHENTICATE"):
         print "Enabling authentication for the framework"
 
@@ -181,17 +192,19 @@ if __name__ == "__main__":
         framework.principal = os.getenv("DEFAULT_PRINCIPAL")
 
         driver = mesos.native.MesosSchedulerDriver(
-            TestScheduler(executor),
+            TestScheduler(implicitAcknowledgements, executor),
             framework,
             sys.argv[1],
+            implicitAcknowledgements,
             credential)
     else:
         framework.principal = "test-framework-python"
 
         driver = mesos.native.MesosSchedulerDriver(
-            TestScheduler(executor),
+            TestScheduler(implicitAcknowledgements, executor),
             framework,
-            sys.argv[1])
+            sys.argv[1],
+            implicitAcknowledgements)
 
     status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index 0a4fde5..04468c2 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -56,8 +56,12 @@ const int32_t MEM_PER_TASK = 128;
 class TestScheduler : public Scheduler
 {
 public:
-  TestScheduler(const ExecutorInfo& _executor, const string& _role)
-    : executor(_executor),
+  TestScheduler(
+      bool _implicitAcknowledgements,
+      const ExecutorInfo& _executor,
+      const string& _role)
+    : implicitAcknowledgements(_implicitAcknowledgements),
+      executor(_executor),
       role(_role),
       tasksLaunched(0),
       tasksFinished(0),
@@ -127,8 +131,9 @@ public:
 
     cout << "Task " << taskId << " is in state " << status.state() << endl;
 
-    if (status.state() == TASK_FINISHED)
+    if (status.state() == TASK_FINISHED) {
       tasksFinished++;
+    }
 
     if (status.state() == TASK_LOST ||
         status.state() == TASK_KILLED ||
@@ -142,8 +147,13 @@ public:
       driver->abort();
     }
 
-    if (tasksFinished == totalTasks)
+    if (!implicitAcknowledgements) {
+      driver->acknowledgeStatusUpdate(status);
+    }
+
+    if (tasksFinished == totalTasks) {
       driver->stop();
+    }
   }
 
   virtual void frameworkMessage(SchedulerDriver* driver,
@@ -164,6 +174,7 @@ public:
   }
 
 private:
+  const bool implicitAcknowledgements;
   const ExecutorInfo executor;
   string role;
   int tasksLaunched;
@@ -223,8 +234,6 @@ int main(int argc, char** argv)
   executor.set_name("Test Executor (C++)");
   executor.set_source("cpp_test");
 
-  TestScheduler scheduler(executor, role);
-
   FrameworkInfo framework;
   framework.set_user(""); // Have Mesos fill in the current user.
   framework.set_name("Test Framework (C++)");
@@ -235,7 +244,16 @@ int main(int argc, char** argv)
         numify<bool>(os::getenv("MESOS_CHECKPOINT")).get());
   }
 
+  bool implicitAcknowledgements = true;
+  if (os::hasenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS")) {
+    cout << "Enabling explicit acknowledgements for status updates" << endl;
+
+    implicitAcknowledgements = false;
+  }
+
   MesosSchedulerDriver* driver;
+  TestScheduler scheduler(implicitAcknowledgements, executor, role);
+
   if (os::hasenv("MESOS_AUTHENTICATE")) {
     cout << "Enabling authentication for the framework" << endl;
 
@@ -254,12 +272,19 @@ int main(int argc, char** argv)
     framework.set_principal(getenv("DEFAULT_PRINCIPAL"));
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, master.get(), credential);
+        &scheduler,
+        framework,
+        master.get(),
+        implicitAcknowledgements,
+        credential);
   } else {
     framework.set_principal("test-framework-cpp");
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, master.get());
+        &scheduler,
+        framework,
+        master.get(),
+        implicitAcknowledgements);
   }
 
   int status = driver->run() == DRIVER_STOPPED ? 0 : 1;


[3/8] mesos git commit: Introduced explicit status update acknowledgements on the driver.

Posted by bm...@apache.org.
Introduced explicit status update acknowledgements on the driver.

Review: https://reviews.apache.org/r/30971


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff4397a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff4397a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff4397a6

Branch: refs/heads/master
Commit: ff4397a6c278d32b3523ed9cae8d43b8a7772278
Parents: 1026991
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:25:51 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto   |  10 ++
 include/mesos/scheduler.hpp |  44 ++++++++-
 src/sched/sched.cpp         | 192 +++++++++++++++++++++++++++++++++------
 src/scheduler/scheduler.cpp |   2 +
 4 files changed, 217 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 507845c..14ff7f9 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -784,6 +784,16 @@ message TaskStatus {
   optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.
   optional double timestamp = 6;
 
+  // Statuses that are delivered reliably to the scheduler will
+  // include a 'uuid'. The status is considered delivered once
+  // it is acknowledged by the scheduler. Schedulers can choose
+  // to either explicitly acknowledge statuses or let the scheduler
+  // driver implicitly acknowledge (default).
+  //
+  // TODO(bmahler): This is currently overwritten in the scheduler
+  // driver, even if executors set this.
+  optional bytes uuid = 11;
+
   // Describes whether the task has been determined to be healthy
   // (true) or unhealthy (false) according to the HealthCheck field in
   // the command info.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 31256c1..f24ec80 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -123,12 +123,15 @@ public:
 
   // Invoked when the status of a task has changed (e.g., a slave is
   // lost and so the task is lost, a task finishes and an executor
-  // sends a status update saying so, etc). Note that returning from
-  // this callback _acknowledges_ receipt of this status update! If
-  // for whatever reason the scheduler aborts during this callback (or
+  // sends a status update saying so, etc). If implicit
+  // acknowledgements are being used, then returning from this
+  // callback _acknowledges_ receipt of this status update! If for
+  // whatever reason the scheduler aborts during this callback (or
   // the process exits) another status update will be delivered (note,
   // however, that this is currently not true if the slave sending the
-  // status update is lost/fails during that time).
+  // status update is lost/fails during that time). If explicit
+  // acknowledgements are in use, the scheduler must acknowledge this
+  // status on the driver.
   virtual void statusUpdate(
       SchedulerDriver* driver,
       const TaskStatus& status) = 0;
@@ -269,6 +272,14 @@ public:
   // those filtered slaves.
   virtual Status reviveOffers() = 0;
 
+  // Acknowledges the status update. This should only be called
+  // once the status update is processed durably by the scheduler.
+  // Not that explicit acknowledgements must be requested via the
+  // constructor argument, otherwise a call to this method will
+  // cause the driver to crash.
+  virtual Status acknowledgeStatusUpdate(
+      const TaskStatus& status) = 0;
+
   // Sends a message from the framework to one of its executors. These
   // messages are best effort; do not expect a framework message to be
   // retransmitted in any reliable fashion.
@@ -348,6 +359,26 @@ public:
       const std::string& master,
       const Credential& credential);
 
+  // These constructors are the same as the above two, but allow
+  // the framework to specify whether implicit or explicit
+  // acknowledgements are desired. See statusUpdate() for the
+  // details about explicit acknowledgements.
+  //
+  // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
+  // these new constructors are exposed.
+  MesosSchedulerDriver(
+      Scheduler* scheduler,
+      const FrameworkInfo& framework,
+      const std::string& master,
+      bool implicitAcknowledgements);
+
+  MesosSchedulerDriver(
+      Scheduler* scheduler,
+      const FrameworkInfo& framework,
+      const std::string& master,
+      bool implicitAcknowlegements,
+      const Credential& credential);
+
   // This destructor will block indefinitely if
   // MesosSchedulerDriver::start was invoked successfully (possibly
   // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
@@ -389,6 +420,9 @@ public:
 
   virtual Status reviveOffers();
 
+  virtual Status acknowledgeStatusUpdate(
+      const TaskStatus& status);
+
   virtual Status sendFrameworkMessage(
       const ExecutorID& executorId,
       const SlaveID& slaveId,
@@ -423,6 +457,8 @@ private:
   // Current status of the driver.
   Status status;
 
+  const bool implicitAcknowlegements;
+
   const Credential* credential;
 
   // Scheduler process ID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index ea7e447..280eaeb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -54,6 +54,7 @@
 #include <process/metrics/gauge.hpp>
 #include <process/metrics/metrics.hpp>
 
+#include <stout/abort.hpp>
 #include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
@@ -115,6 +116,7 @@ public:
                    Scheduler* _scheduler,
                    const FrameworkInfo& _framework,
                    const Option<Credential>& _credential,
+                   bool _implicitAcknowledgements,
                    const string& schedulerId,
                    MasterDetector* _detector,
                    const scheduler::Flags& _flags,
@@ -142,6 +144,7 @@ public:
       running(true),
       detector(_detector),
       flags(_flags),
+      implicitAcknowledgements(_implicitAcknowledgements),
       credential(_credential),
       authenticatee(NULL),
       authenticating(None()),
@@ -647,8 +650,6 @@ protected:
       const StatusUpdate& update,
       const UPID& pid)
   {
-    const TaskStatus& status = update.status();
-
     if (!running) {
       VLOG(1) << "Ignoring task status update message because "
               << "the driver is not running!";
@@ -686,6 +687,23 @@ protected:
     // multiple times (of course, if a scheduler re-uses a TaskID,
     // that could be bad.
 
+    TaskStatus status = update.status();
+
+    // If the update is driver-generated or master-generated, it
+    // does not require acknowledgement and so we unset the 'uuid'
+    // field of TaskStatus. Otherwise, we overwrite the field to
+    // ensure that a 0.22.0 scheduler driver supports explicit
+    // acknowledgements, even if running against a 0.21.0 cluster.
+    //
+    // TODO(bmahler): Update the slave / executor driver to ensure
+    // that 'uuid' is set accurately by the time it reaches the
+    // scheduler driver. This will be required for pure bindings.
+    if (from == UPID() || pid == UPID()) {
+      status.clear_uuid();
+    } else {
+      status.set_uuid(update.uuid());
+    }
+
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
       stopwatch.start();
@@ -695,30 +713,32 @@ protected:
 
     VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
 
-    // Note that we need to look at the volatile 'aborted' here to
-    // so that we don't acknowledge the update if the driver was
-    // aborted during the processing of the update.
-    if (!running) {
-      VLOG(1) << "Not sending status update acknowledgment message because "
-              << "the driver is not running!";
-      return;
-    }
-
-    // Don't acknowledge updates created by the driver or master.
-    if (from != UPID() && pid != UPID()) {
-      // We drop updates while we're disconnected.
-      CHECK(connected);
-      CHECK_SOME(master);
-
-      VLOG(2) << "Sending ACK for status update " << update
-              << " to " << master.get();
+    if (implicitAcknowledgements) {
+      // Note that we need to look at the volatile 'running' here
+      // so that we don't acknowledge the update if the driver was
+      // aborted during the processing of the update.
+      if (!running) {
+        VLOG(1) << "Not sending status update acknowledgment message because "
+                << "the driver is not running!";
+        return;
+      }
 
-      StatusUpdateAcknowledgementMessage message;
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      message.mutable_slave_id()->MergeFrom(update.slave_id());
-      message.mutable_task_id()->MergeFrom(update.status().task_id());
-      message.set_uuid(update.uuid());
-      send(master.get(), message);
+      // Don't acknowledge updates created by the driver or master.
+      if (from != UPID() && pid != UPID()) {
+        // We drop updates while we're disconnected.
+        CHECK(connected);
+        CHECK_SOME(master);
+
+        VLOG(2) << "Sending ACK for status update " << update
+            << " to " << master.get();
+
+        StatusUpdateAcknowledgementMessage message;
+        message.mutable_framework_id()->MergeFrom(framework.id());
+        message.mutable_slave_id()->MergeFrom(update.slave_id());
+        message.mutable_task_id()->MergeFrom(update.status().task_id());
+        message.set_uuid(update.uuid());
+        send(master.get(), message);
+      }
     }
   }
 
@@ -1072,6 +1092,53 @@ protected:
     send(master.get(), message);
   }
 
+  void acknowledgeStatusUpdate(
+      const TaskStatus& status)
+  {
+    // The driver should abort before allowing an acknowledgement
+    // call when implicit acknowledgements are enabled. We further
+    // enforce that the driver is denying the call through this CHECK.
+    CHECK(!implicitAcknowledgements);
+
+    if (!connected) {
+      VLOG(1) << "Ignoring explicit status update acknowledgement"
+                 " because the driver is disconnected";
+      return;
+    }
+
+    CHECK_SOME(master);
+
+    // NOTE: By ignoring the volatile 'running' here, we ensure that
+    // all acknowledgements requested before the driver was stopped
+    // or aborted are processed. Any acknowledgement that is requested
+    // after the driver stops or aborts (running == false) will be
+    // dropped in the driver before reaching here.
+
+    // Only statuses with a 'uuid' and a 'slave_id' need to have
+    // acknowledgements sent to the master. Note that the driver
+    // ensures that master-generated and driver-generated updates
+    // will not have a 'uuid' set.
+    if (status.has_uuid() && status.has_slave_id()) {
+      VLOG(2) << "Sending ACK for status update " << status.uuid()
+              << " of task " << status.task_id()
+              << " on slave " << status.slave_id()
+              << " to " << master.get();
+
+      StatusUpdateAcknowledgementMessage message;
+      message.mutable_framework_id()->CopyFrom(framework.id());
+      message.mutable_slave_id()->CopyFrom(status.slave_id());
+      message.mutable_task_id()->CopyFrom(status.task_id());
+      message.set_uuid(status.uuid());
+      send(master.get(), message);
+    } else {
+      VLOG(2) << "Received ACK for status update"
+              << (status.has_uuid() ? " " + status.uuid() : "")
+              << " of task " << status.task_id()
+              << (status.has_slave_id()
+                  ? " on slave " + stringify(status.slave_id()) : "");
+    }
+  }
+
   void sendFrameworkMessage(const ExecutorID& executorId,
                             const SlaveID& slaveId,
                             const string& data)
@@ -1204,6 +1271,13 @@ private:
   hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
   hashmap<SlaveID, UPID> savedSlavePids;
 
+  // The driver optionally provides implicit acknowledgements
+  // for frameworks. If disabled, the framework must send its
+  // own acknowledgements through the driver, when the 'uuid'
+  // of the TaskStatus is set (which also implies the 'slave_id'
+  // is set).
+  bool implicitAcknowledgements;
+
   const Option<Credential> credential;
 
   Authenticatee* authenticatee;
@@ -1319,6 +1393,45 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     master(_master),
     process(NULL),
     status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(true),
+    credential(NULL),
+    schedulerId("scheduler-" + UUID::random().toString())
+{
+  initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+    Scheduler* _scheduler,
+    const FrameworkInfo& _framework,
+    const string& _master,
+    const Credential& _credential)
+  : detector(NULL),
+    scheduler(_scheduler),
+    framework(_framework),
+    master(_master),
+    process(NULL),
+    status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(true),
+    credential(new Credential(_credential)),
+    schedulerId("scheduler-" + UUID::random().toString())
+{
+  initialize();
+}
+
+
+MesosSchedulerDriver::MesosSchedulerDriver(
+    Scheduler* _scheduler,
+    const FrameworkInfo& _framework,
+    const string& _master,
+    bool _implicitAcknowlegements)
+  : detector(NULL),
+    scheduler(_scheduler),
+    framework(_framework),
+    master(_master),
+    process(NULL),
+    status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(_implicitAcknowlegements),
     credential(NULL),
     schedulerId("scheduler-" + UUID::random().toString())
 {
@@ -1326,12 +1439,11 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 }
 
 
-// The implementation of this is same as the above constructor
-// except that the SchedulerProcess is passed the credential.
 MesosSchedulerDriver::MesosSchedulerDriver(
     Scheduler* _scheduler,
     const FrameworkInfo& _framework,
     const string& _master,
+    bool _implicitAcknowlegements,
     const Credential& _credential)
   : detector(NULL),
     scheduler(_scheduler),
@@ -1339,6 +1451,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     master(_master),
     process(NULL),
     status(DRIVER_NOT_STARTED),
+    implicitAcknowlegements(_implicitAcknowlegements),
     credential(new Credential(_credential)),
     schedulerId("scheduler-" + UUID::random().toString())
 {
@@ -1438,6 +1551,7 @@ Status MesosSchedulerDriver::start()
         scheduler,
         framework,
         None(),
+        implicitAcknowlegements,
         schedulerId,
         detector,
         flags,
@@ -1450,6 +1564,7 @@ Status MesosSchedulerDriver::start()
         scheduler,
         framework,
         cred,
+        implicitAcknowlegements,
         schedulerId,
         detector,
         flags,
@@ -1644,6 +1759,29 @@ Status MesosSchedulerDriver::reviveOffers()
 }
 
 
+Status MesosSchedulerDriver::acknowledgeStatusUpdate(
+    const TaskStatus& taskStatus)
+{
+  Lock lock(&mutex);
+
+  if (status != DRIVER_RUNNING) {
+    return status;
+  }
+
+  // TODO(bmahler): Should this use abort() instead?
+  if (implicitAcknowlegements) {
+    ABORT("Cannot call acknowledgeStatusUpdate:"
+          " Implicit acknowledgements are enabled");
+  }
+
+  CHECK(process != NULL);
+
+  dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+
+  return status;
+}
+
+
 Status MesosSchedulerDriver::sendFrameworkMessage(
     const ExecutorID& executorId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff4397a6/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 5816569..23658c8 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -689,6 +689,7 @@ protected:
     update->mutable_status()->set_timestamp(message.update().timestamp());
 
     update->set_uuid(message.update().uuid());
+    update->mutable_status()->set_uuid(message.update().uuid());
 
     receive(from, event);
   }
@@ -761,6 +762,7 @@ protected:
     status->set_timestamp(Clock::now().secs());
 
     update->set_uuid(UUID::random().toBytes());
+    status->set_uuid(update->uuid());
 
     receive(None(), event);
   }


[7/8] mesos git commit: Added explicit acknowledgement integration tests.

Posted by bm...@apache.org.
Added explicit acknowledgement integration tests.

Review: https://reviews.apache.org/r/30977


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a7df8d9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a7df8d9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a7df8d9b

Branch: refs/heads/master
Commit: a7df8d9b91bd5c797b5cf643de7670a99400a217
Parents: e1aeec0
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:32:08 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:33 2015 -0800

----------------------------------------------------------------------
 src/tests/scheduler_tests.cpp | 196 +++++++++++++++++++++++++++++++++++++
 1 file changed, 196 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a7df8d9b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 9071a85..fb81e03 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -18,6 +18,7 @@
 
 #include <gmock/gmock.h>
 
+#include <string>
 #include <queue>
 #include <vector>
 
@@ -68,6 +69,7 @@ using process::http::OK;
 
 using process::metrics::internal::MetricsProcess;
 
+using std::string;
 using std::queue;
 using std::vector;
 
@@ -317,3 +319,197 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
 
   Shutdown();
 }
+
+
+// Ensures that when a scheduler enables explicit acknowledgements
+// on the driver, there are no implicit acknowledgements sent, and
+// the call to 'acknowledgeStatusUpdate' sends the ack to the master.
+TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Ensure no status update acknowledgements are sent from the driver
+  // to the master until the explicit acknowledgement is sent.
+  EXPECT_NO_FUTURE_PROTOBUFS(
+      StatusUpdateAcknowledgementMessage(), _ , master.get());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.start();
+
+  AWAIT_READY(status);
+
+  // Settle the clock to ensure driver finishes processing the status
+  // update, we want to ensure that no implicit acknowledgement gets
+  // sent.
+  Clock::pause();
+  Clock::settle();
+
+  // Now send the acknowledgement.
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _ , master.get());
+
+  driver.acknowledgeStatusUpdate(status.get());
+
+  AWAIT_READY(acknowledgement);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test ensures that when explicit acknowledgements are enabled,
+// acknowledgements for master-generated updates are dropped by the
+// driver. We test this by creating an invalid task that uses no
+// resources.
+TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Ensure no status update acknowledgements are sent to the master.
+  EXPECT_NO_FUTURE_PROTOBUFS(
+      StatusUpdateAcknowledgementMessage(), _ , master.get());
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Launch a task using no resources.
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_ERROR, status.get().state());
+  ASSERT_EQ(TaskStatus::SOURCE_MASTER, status.get().source());
+  ASSERT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+
+  // Now send the acknowledgement.
+  driver.acknowledgeStatusUpdate(status.get());
+
+  // Settle the clock to ensure driver processes the acknowledgement,
+  // which should get dropped due to having come from the master.
+  Clock::pause();
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test ensures that the driver handles an empty slave id
+// in an acknowledgement message by dropping it. The driver will
+// log an error in this case (but we don't test for that). We
+// generate a status with no slave id by performing reconciliation.
+TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Ensure no status update acknowledgements are sent to the master.
+  EXPECT_NO_FUTURE_PROTOBUFS(
+      StatusUpdateAcknowledgementMessage(), _ , master.get());
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  // Peform reconciliation without using a slave id.
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->set_value("foo");
+  status.set_state(TASK_RUNNING);
+
+  statuses.push_back(status);
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(update);
+  ASSERT_EQ(TASK_LOST, update.get().state());
+  ASSERT_EQ(TaskStatus::SOURCE_MASTER, update.get().source());
+  ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason());
+  ASSERT_FALSE(update.get().has_slave_id());
+
+  // Now send the acknowledgement.
+  driver.acknowledgeStatusUpdate(update.get());
+
+  // Settle the clock to ensure driver processes the acknowledgement,
+  // which should get dropped due to the missing slave id.
+  Clock::pause();
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[4/8] mesos git commit: Updated Python bindings for explicit acknowledgements.

Posted by bm...@apache.org.
Updated Python bindings for explicit acknowledgements.

Review: https://reviews.apache.org/r/30973


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4013c0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4013c0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4013c0a

Branch: refs/heads/master
Commit: e4013c0ac158c4e2851bedd662f6b48ea503b80c
Parents: 28d5f93
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:26:44 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 .../interface/src/mesos/interface/__init__.py   | 28 +++++++---
 .../native/mesos_scheduler_driver_impl.cpp      | 54 ++++++++++++++++++--
 .../native/mesos_scheduler_driver_impl.hpp      |  4 ++
 3 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/interface/src/mesos/interface/__init__.py
----------------------------------------------------------------------
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 8af649c..f3d96a4 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -87,14 +87,17 @@ class Scheduler(object):
 
   def statusUpdate(self, driver, status):
     """
-      Invoked when the status of a task has changed (e.g., a slave is lost
-      and so the task is lost, a task finishes and an executor sends a
-      status update saying so, etc.) Note that returning from this callback
-      acknowledges receipt of this status update.  If for whatever reason
-      the scheduler aborts during this callback (or the process exits)
-      another status update will be delivered.  Note, however, that this is
-      currently not true if the slave sending the status update is lost or
-      fails during that time.
+      Invoked when the status of a task has changed (e.g., a slave is
+      lost and so the task is lost, a task finishes and an executor
+      sends a status update saying so, etc). If implicit
+      acknowledgements are being used, then returning from this
+      callback _acknowledges_ receipt of this status update! If for
+      whatever reason the scheduler aborts during this callback (or
+      the process exits) another status update will be delivered (note,
+      however, that this is currently not true if the slave sending the
+      status update is lost/fails during that time). If explicit
+      acknowledgements are in use, the scheduler must acknowledge this
+      status on the driver.
     """
 
   def frameworkMessage(self, driver, executorId, slaveId, message):
@@ -214,6 +217,15 @@ class SchedulerDriver(object):
       those filtered slaves.
     """
 
+  def acknowledgeStatusUpdate(self, status):
+    """
+      Acknowledges the status update. This should only be called
+      once the status update is processed durably by the scheduler.
+      Not that explicit acknowledgements must be requested via the
+      constructor argument, otherwise a call to this method will
+      cause the driver to crash.
+    """
+
   def sendFrameworkMessage(self, executorId, slaveId, data):
     """
       Sends a message from the framework to one of its executors. These

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
index 1badf55..bb18845 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
@@ -138,6 +138,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
     METH_NOARGS,
     "Remove all filters and ask Mesos for new offers"
   },
+  { "acknowledgeStatusUpdate",
+    (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
+    METH_VARARGS,
+    "Acknowledge a status update"
+  },
   { "sendFrameworkMessage",
     (PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage,
     METH_VARARGS,
@@ -178,13 +183,22 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
                                   PyObject* args,
                                   PyObject* kwds)
 {
+  // Note: We use an integer for 'implicitAcknoweldgements' because
+  // it is the recommended way to pass booleans through CPython.
   PyObject* schedulerObj = NULL;
   PyObject* frameworkObj = NULL;
   const char* master;
+  int implicitAcknowledgements;
   PyObject* credentialObj = NULL;
 
   if (!PyArg_ParseTuple(
-      args, "OOs|O", &schedulerObj, &frameworkObj, &master, &credentialObj)) {
+      args,
+      "OOs|iO",
+      &schedulerObj,
+      &frameworkObj,
+      &master,
+      &implicitAcknowledgements,
+      &credentialObj)) {
     return -1;
   }
 
@@ -227,10 +241,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
 
   if (credentialObj != NULL) {
     self->driver = new MesosSchedulerDriver(
-        self->proxyScheduler, framework, master, credential);
+        self->proxyScheduler,
+        framework,
+        master,
+        implicitAcknowledgements != 0,
+        credential);
   } else {
     self->driver = new MesosSchedulerDriver(
-        self->proxyScheduler, framework, master);
+        self->proxyScheduler,
+        framework,
+        master,
+        implicitAcknowledgements != 0);
   }
 
   return 0;
@@ -549,6 +570,33 @@ PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
 }
 
 
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
+{
+  if (self->driver == NULL) {
+    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL");
+    return NULL;
+  }
+
+  PyObject* taskStatusObj = NULL;
+  TaskStatus taskStatus;
+
+  if (!PyArg_ParseTuple(args, "O", &taskStatusObj)) {
+    return NULL;
+  }
+
+  if (!readPythonProtobuf(taskStatusObj, &taskStatus)) {
+    PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskStatus");
+    return NULL;
+  }
+
+  Status status = self->driver->acknowledgeStatusUpdate(taskStatus);
+
+  return PyInt_FromLong(status); // Sets exception if creating long fails.
+}
+
+
 PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
     MesosSchedulerDriverImpl* self,
     PyObject* args)

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
index d15dfb9..a698000 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
@@ -111,6 +111,10 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
 
 PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
 
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args);
+
 PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
     MesosSchedulerDriverImpl* self,
     PyObject* args);


[8/8] mesos git commit: Updated documentation for explicit acknowledgement API change.

Posted by bm...@apache.org.
Updated documentation for explicit acknowledgement API change.

Review: https://reviews.apache.org/r/30978


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33699793
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33699793
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33699793

Branch: refs/heads/master
Commit: 336997936f1ee49f66a2b7e1ab7ed97b4bf082ae
Parents: a7df8d9
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:34:40 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:33 2015 -0800

----------------------------------------------------------------------
 CHANGELOG                               | 18 ++++++++++++++----
 docs/app-framework-development-guide.md | 11 +++++++----
 docs/upgrades.md                        | 14 +++++++++++++-
 3 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/33699793/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 966661a..2a54f08 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,11 +1,21 @@
 (WIP) Release Notes - Mesos - Version 0.22.0
 --------------------------------------
 
+This release contains several new features:
+
+* Support for explicitly sending status updates acknowledgements from
+  schedulers; refer to the upgrades document for upgrading schedulers.
+
 * API Changes:
-  * [MESOS-1143] - TASK_ERROR is now sent instead of TASK_LOST when rescheduling a task should not be attempted.
-  * [MESOS-2086] - Update messages.proto to use a raw bytestream instead of a string for AuthenticationStartMessage.
-  * [MESOS-2322] - All arguments can now read their values from a file, just specify
-    --name=file://path/to/file.
+  * [MESOS-1143] - TASK_ERROR is now sent instead of TASK_LOST when rescheduling
+                   a task should not be attempted.
+  * [MESOS-2086] - Update messages.proto to use a raw bytestream instead of a
+                   string for AuthenticationStartMessage.
+  * [MESOS-2322] - All arguments can now read their values from a file, just
+                   specify --name=file://path/to/file.
+  * [MESOS-2347] - The C++/Java/Python APIs have been updated to provide the
+                   ability for schedulers to explicitly send acknowledgements.
+                   TaskStatus now includes a UUID to enable this.
 
 * Deprecations:
   * [MESOS-2058] - Deprecate stats.json endpoints for Master and Slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/33699793/docs/app-framework-development-guide.md
----------------------------------------------------------------------
diff --git a/docs/app-framework-development-guide.md b/docs/app-framework-development-guide.md
index e6247b2..dd7603d 100644
--- a/docs/app-framework-development-guide.md
+++ b/docs/app-framework-development-guide.md
@@ -79,12 +79,15 @@ Declared in `MESOS_HOME/include/mesos/scheduler.hpp`
   /**
    * Invoked when the status of a task has changed (e.g., a slave is
    * lost and so the task is lost, a task finishes and an executor
-   * sends a status update saying so, etc). Note that returning from
-   * this callback _acknowledges_ receipt of this status update! If
-   * for whatever reason the scheduler aborts during this callback (or
+   * sends a status update saying so, etc). If implicit
+   * acknowledgements are being used, then returning from this
+   * callback _acknowledges_ receipt of this status update! If for
+   * whatever reason the scheduler aborts during this callback (or
    * the process exits) another status update will be delivered (note,
    * however, that this is currently not true if the slave sending the
-   * status update is lost/fails during that time).
+   * status update is lost/fails during that time). If explicit
+   * acknowledgements are in use, the scheduler must acknowledge this
+   * status on the driver.
    */
   virtual void statusUpdate(SchedulerDriver* driver,
                             const TaskStatus& status) = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/33699793/docs/upgrades.md
----------------------------------------------------------------------
diff --git a/docs/upgrades.md b/docs/upgrades.md
index 168e761..07d19f9 100644
--- a/docs/upgrades.md
+++ b/docs/upgrades.md
@@ -8,6 +8,8 @@ This document serves as a guide for users who wish to upgrade an existing mesos
 
 ## (WIP) Upgrading from 0.21.x to 0.22.x
 
+**NOTE** The C++/Java/Python scheduler bindings have been updated. In particular, the driver can be constructed with an additional argument that specifies whether to use implicit driver acknowledgements. In `statusUpdate`, the `TaskStatus` now includes a UUID to make explicit acknowledgements possible.
+
 **NOTE**: The Authentication API has changed slightly in this release to support additional authentication mechanisms. The change from 'string' to 'bytes' for AuthenticationStartMessage.data has no impact on C++ or the over-the-wire representation, so it only impacts pure language bindings for languages like Java and Python that use different types for UTF-8 strings vs. byte arrays.
 
 ```
@@ -17,7 +19,17 @@ message AuthenticationStartMessage {
 }
 ```
 
-All Mesos arguments can now be passed using file:// to read them out of a file (either an absolute or relative path). The --credentials, --whitelist, and any flags that expect JSON backed arguments (such as --modules) behave as before, although support for just passing a absolute path for any JSON flags rather than file:// has been deprecated and will produce a warning (and the absolute path behavior will be removed in a future release).
+**NOTE** All Mesos arguments can now be passed using file:// to read them out of a file (either an absolute or relative path). The --credentials, --whitelist, and any flags that expect JSON backed arguments (such as --modules) behave as before, although support for just passing a absolute path for any JSON flags rather than file:// has been deprecated and will produce a warning (and the absolute path behavior will be removed in a future release).
+
+In order to upgrade a running cluster:
+
+* Install the new master binaries and restart the masters.
+* Install the new slave binaries and restart the slaves.
+* Upgrade the schedulers:
+  * For Java schedulers, link the new native library against the new JAR. The JAR contains API changes per the **NOTE** above. A 0.21.0 JAR will work with a 0.22.0 libmesos. A 0.22.0 JAR will work with a 0.21.0 libmesos if explicit acks are not being used. 0.22.0 and 0.21.0 are inter-operable at the protocol level between the master and the scheduler.
+  * For Python schedulers, upgrade to use a 0.22.0 egg. If constructing `MesosSchedulerDriverImpl` with `Credentials`, your code must be updated to pass the `implicitAcknowledgements` argument before `Credentials`. You may run a 0.21.0 Python scheduler against a 0.22.0 master, and vice versa.
+* Restart the schedulers.
+* Upgrade the executors by linking the latest native library / jar / egg.
 
 ## Upgrading from 0.20.x to 0.21.x
 


[2/8] mesos git commit: Updated Java bindings for explicit acknowledgements.

Posted by bm...@apache.org.
Updated Java bindings for explicit acknowledgements.

Review: https://reviews.apache.org/r/30972


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28d5f932
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28d5f932
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28d5f932

Branch: refs/heads/master
Commit: 28d5f9326ef2b7c6f2d2b5ece351d4ca09608100
Parents: ff4397a
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:26:30 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 .../org_apache_mesos_MesosSchedulerDriver.cpp   | 52 ++++++++++-
 .../org/apache/mesos/MesosSchedulerDriver.java  | 94 +++++++++++++++++++-
 src/java/src/org/apache/mesos/Scheduler.java    | 11 ++-
 .../src/org/apache/mesos/SchedulerDriver.java   | 15 ++++
 4 files changed, 162 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 3498930..4f0dad7 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -283,8 +283,8 @@ void JNIScheduler::statusUpdate(SchedulerDriver* driver,
   // scheduler.statusUpdate(driver, status);
   jmethodID statusUpdate =
     env->GetMethodID(clazz, "statusUpdate",
-		     "(Lorg/apache/mesos/SchedulerDriver;"
-		     "Lorg/apache/mesos/Protos$TaskStatus;)V");
+                     "(Lorg/apache/mesos/SchedulerDriver;"
+                     "Lorg/apache/mesos/Protos$TaskStatus;)V");
 
   jobject jstatus = convert<TaskStatus>(env, status);
 
@@ -498,13 +498,31 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_MesosSchedulerDriver_initialize
   jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
   jobject jmaster = env->GetObjectField(thiz, master);
 
+  // NOTE: Older versions (< 0.22.0) of MesosSchedulerDriver.java
+  // do not have the 'implicitAcknowledgements' field, so when None()
+  // is returned we default to the old behavior: implicit
+  // acknowledgements.
+  Result<jfieldID> implicitAcknowledgements =
+    getFieldID(env, clazz, "implicitAcknowledgements", "Z");
+
+  if (implicitAcknowledgements.isError()) {
+    return; // Exception has been thrown.
+  }
+
+  // Default to implicit acknowledgements, as done before 0.22.0.
+  jboolean jimplicitAcknowledgements = JNI_TRUE;
+  if (implicitAcknowledgements.isSome()) {
+    jimplicitAcknowledgements =
+      env->GetBooleanField(thiz, implicitAcknowledgements.get());
+  }
+
   // Get out the Credential passed into the constructor.
   // NOTE: Older versions (< 0.15.0) of MesosSchedulerDriver do not set
   // 'credential' field. To be backwards compatible we should safely
   // handle this case.
   Result<jfieldID> credential = getFieldID(env, clazz, "credential", "Lorg/apache/mesos/Protos$Credential;");
   if (credential.isError()) {
-    return;
+    return; // Exception has been thrown.
   }
 
   jobject jcredential = NULL;
@@ -520,12 +538,14 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_MesosSchedulerDriver_initialize
         scheduler,
         construct<FrameworkInfo>(env, jframework),
         construct<string>(env, jmaster),
+        construct(env, jimplicitAcknowledgements),
         construct<Credential>(env, jcredential));
   } else {
     driver = new MesosSchedulerDriver(
        scheduler,
        construct<FrameworkInfo>(env, jframework),
-       construct<string>(env, jmaster));
+       construct<string>(env, jmaster),
+       construct(env, jimplicitAcknowledgements));
   }
 
   // Initialize the __driver variable
@@ -648,6 +668,30 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_join
 
 /*
  * Class:     org_apache_mesos_MesosSchedulerDriver
+ * Method:    acknowledgeStatusUpdate
+ * Signature: (Lorg/apache/mesos/Protos/TaskStatus;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_acknowledgeStatusUpdate
+  (JNIEnv* env, jobject thiz, jobject jtaskStatus)
+{
+  // Construct a C++ TaskID from the Java TaskId.
+  const TaskStatus& taskStatus = construct<TaskStatus>(env, jtaskStatus);
+
+  // Now invoke the underlying driver.
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+  MesosSchedulerDriver* driver =
+    (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+  Status status = driver->acknowledgeStatusUpdate(taskStatus);
+
+  return convert<Status>(env, status);
+}
+
+
+/*
+ * Class:     org_apache_mesos_MesosSchedulerDriver
  * Method:    sendFrameworkMessage
  * Signature: (Lorg/apache/mesos/Protos/ExecutorID;Lorg/apache/mesos/Protos/SlaveID;[B)Lorg/apache/mesos/Protos/Status;
  */

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index 6ad03ce..a1055a5 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -106,14 +106,15 @@ public class MesosSchedulerDriver implements SchedulerDriver {
     this.scheduler = scheduler;
     this.framework = framework;
     this.master = master;
+    this.implicitAcknowledgements = true;
     this.credential = null;
 
     initialize();
   }
 
   /**
-   * Same as the above constructor, except that it accepts 'credential'
-   * as a parameter.
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'credential' parameter.
    *
    * @param scheduler   The scheduler implementation which callbacks are invoked
    *                    upon scheduler events.
@@ -146,6 +147,92 @@ public class MesosSchedulerDriver implements SchedulerDriver {
     this.scheduler = scheduler;
     this.framework = framework;
     this.master = master;
+    this.implicitAcknowledgements = true;
+    this.credential = credential;
+
+    initialize();
+  }
+
+  /**
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'implicitAcknowledgements' parameter.
+   *
+   * @param scheduler   The scheduler implementation which callbacks are invoked
+   *                    upon scheduler events.
+   * @param framework   The frameworkInfo describing the current framework.
+   * @param master      The address to the currently active Mesos master.
+   * @param implicitAcknowledgements  Whether the driver should send
+   *            acknowledgements on behalf of the scheduler. Setting this to
+   *            false allows schedulers to perform their own acknowledgements,
+   *            which enables asynchronous / batch processing of status updates.
+   */
+  public MesosSchedulerDriver(Scheduler scheduler,
+                              FrameworkInfo framework,
+                              String master,
+                              boolean implicitAcknowledgements) {
+
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null Scheduler");
+    }
+
+    if (framework == null) {
+      throw new NullPointerException("Not expecting a null FrameworkInfo");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    this.scheduler = scheduler;
+    this.framework = framework;
+    this.master = master;
+    this.implicitAcknowledgements = implicitAcknowledgements;
+    this.credential = null;
+
+    initialize();
+  }
+
+  /**
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'implicitAcknowledgements' and 'credentials' parameters.
+   *
+   * @param scheduler   The scheduler implementation which callbacks are invoked
+   *                    upon scheduler events.
+   * @param framework   The frameworkInfo describing the current framework.
+   * @param master      The address to the currently active Mesos master.
+   * @param implicitAcknowledgements  Whether the driver should send
+   *            acknowledgements on behalf of the scheduler. Setting this to
+   *            false allows schedulers to perform their own acknowledgements,
+   *            which enables asynchronous / batch processing of status updates.
+   * @param credential  The credentials that will be used used to authenticate
+   *                    calls from this scheduler.
+   */
+  public MesosSchedulerDriver(Scheduler scheduler,
+                              FrameworkInfo framework,
+                              String master,
+                              boolean implicitAcknowledgements,
+                              Credential credential) {
+
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null Scheduler");
+    }
+
+    if (framework == null) {
+      throw new NullPointerException("Not expecting a null FrameworkInfo");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    if (credential == null) {
+      throw new NullPointerException("Not expecting a null credential");
+    }
+
+    this.scheduler = scheduler;
+    this.framework = framework;
+    this.master = master;
+    this.implicitAcknowledgements = implicitAcknowledgements;
     this.credential = credential;
 
     initialize();
@@ -197,6 +284,8 @@ public class MesosSchedulerDriver implements SchedulerDriver {
 
   public native Status reviveOffers();
 
+  public native Status acknowledgeStatusUpdate(TaskStatus status);
+
   public native Status sendFrameworkMessage(ExecutorID executorId,
                                             SlaveID slaveId,
                                             byte[] data);
@@ -209,6 +298,7 @@ public class MesosSchedulerDriver implements SchedulerDriver {
   private final Scheduler scheduler;
   private final FrameworkInfo framework;
   private final String master;
+  private final boolean implicitAcknowledgements;
   private final Credential credential;
 
   private long __scheduler;

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/Scheduler.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/Scheduler.java b/src/java/src/org/apache/mesos/Scheduler.java
index 337e455..0e02f89 100644
--- a/src/java/src/org/apache/mesos/Scheduler.java
+++ b/src/java/src/org/apache/mesos/Scheduler.java
@@ -112,12 +112,15 @@ public interface Scheduler {
   /**
    * Invoked when the status of a task has changed (e.g., a slave is
    * lost and so the task is lost, a task finishes and an executor
-   * sends a status update saying so, etc). Note that returning from
-   * this callback _acknowledges_ receipt of this status update! If
-   * for whatever reason the scheduler aborts during this callback (or
+   * sends a status update saying so, etc). If implicit
+   * acknowledgements are being used, then returning from this
+   * callback _acknowledges_ receipt of this status update! If for
+   * whatever reason the scheduler aborts during this callback (or
    * the process exits) another status update will be delivered (note,
    * however, that this is currently not true if the slave sending the
-   * status update is lost/fails during that time).
+   * status update is lost/fails during that time). If explicit
+   * acknowledgements are in use, the scheduler must acknowledge this
+   * status on the driver.
    *
    * @param driver The driver that was used to run this scheduler.
    * @param status The status update, which includes the task ID and status.

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index e2d1f92..d5b100a 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -227,6 +227,21 @@ public interface SchedulerDriver {
   Status reviveOffers();
 
   /**
+   * Acknowledges the status update. This should only be called
+   * once the status update is processed durably by the scheduler.
+   * Not that explicit acknowledgements must be requested via the
+   * constructor argument, otherwise a call to this method will
+   * cause the driver to crash.
+   *
+   * @param status  The status to acknowledge.
+   *
+   * @return        The state of the driver after the call.
+   *
+   * @see TaskStatus
+   */
+  Status acknowledgeStatusUpdate(TaskStatus status);
+
+  /**
    * Sends a message from the framework to one of its executors. These
    * messages are best effort; do not expect a framework message to be
    * retransmitted in any reliable fashion.