You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:56 UTC

[6/8] mesos git commit: Updated test frameworks for explicit acknowledgements.

Updated test frameworks for explicit acknowledgements.

Review: https://reviews.apache.org/r/30976


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1aeec05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1aeec05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1aeec05

Branch: refs/heads/master
Commit: e1aeec052cf9c8cec1cf14f4c72c83e2d6b68f02
Parents: a2626d6
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:31:27 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:33 2015 -0800

----------------------------------------------------------------------
 src/examples/java/TestFramework.java  | 35 ++++++++++++++++++++-----
 src/examples/python/test_framework.py | 21 ++++++++++++---
 src/examples/test_framework.cpp       | 41 ++++++++++++++++++++++++------
 3 files changed, 78 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index ce87de8..65ba9d9 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -31,11 +31,15 @@ import org.apache.mesos.Protos.*;
 
 public class TestFramework {
   static class TestScheduler implements Scheduler {
-    public TestScheduler(ExecutorInfo executor) {
-      this(executor, 5);
+    public TestScheduler(boolean implicitAcknowledgements,
+                         ExecutorInfo executor) {
+      this(implicitAcknowledgements, executor, 5);
     }
 
-    public TestScheduler(ExecutorInfo executor, int totalTasks) {
+    public TestScheduler(boolean implicitAcknowledgements,
+                         ExecutorInfo executor,
+                         int totalTasks) {
+      this.implicitAcknowledgements = implicitAcknowledgements;
       this.executor = executor;
       this.totalTasks = totalTasks;
     }
@@ -139,6 +143,10 @@ public class TestFramework {
                            " with message '" + status.getMessage() + "'");
         driver.abort();
       }
+
+      if (!implicitAcknowledgements) {
+        driver.acknowledgeStatusUpdate(status);
+      }
     }
 
     @Override
@@ -160,6 +168,7 @@ public class TestFramework {
       System.out.println("Error: " + message);
     }
 
+    private final boolean implicitAcknowledgements;
     private final ExecutorInfo executor;
     private final int totalTasks;
     private int launchedTasks = 0;
@@ -197,9 +206,16 @@ public class TestFramework {
       frameworkBuilder.setCheckpoint(true);
     }
 
+    boolean implicitAcknowledgements = true;
+
+    if (System.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS") != null) {
+      System.out.println("Enabling explicit acknowledgements for status updates");
+      implicitAcknowledgements = false;
+    }
+
     Scheduler scheduler = args.length == 1
-        ? new TestScheduler(executor)
-        : new TestScheduler(executor, Integer.parseInt(args[1]));
+        ? new TestScheduler(implicitAcknowledgements, executor)
+        : new TestScheduler(implicitAcknowledgements, executor, Integer.parseInt(args[1]));
 
     MesosSchedulerDriver driver = null;
     if (System.getenv("MESOS_AUTHENTICATE") != null) {
@@ -222,11 +238,16 @@ public class TestFramework {
 
       frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
 
-      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential);
+      driver = new MesosSchedulerDriver(
+          scheduler,
+          frameworkBuilder.build(),
+          args[0],
+          implicitAcknowledgements,
+          credential);
     } else {
       frameworkBuilder.setPrincipal("test-framework-java");
 
-      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]);
+      driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], implicitAcknowledgements);
     }
 
     int status = driver.run() == Status.DRIVER_STOPPED ? 0 : 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/python/test_framework.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index aad6d77..2710614 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -30,7 +30,8 @@ TASK_CPUS = 1
 TASK_MEM = 128
 
 class TestScheduler(mesos.interface.Scheduler):
-    def __init__(self, executor):
+    def __init__(self, implicitAcknowledgements, executor):
+        self.implicitAcknowledgements = implicitAcknowledgements
         self.executor = executor
         self.taskData = {}
         self.tasksLaunched = 0
@@ -123,6 +124,11 @@ class TestScheduler(mesos.interface.Scheduler):
                 % (update.task_id.value, mesos_pb2.TaskState.Name(update.state), update.message)
             driver.abort()
 
+        # Explicitly acknowledge the update if implicit acknowledgements
+        # are not being used.
+        if not self.implicitAcknowledgements:
+            driver.acknowledgeStatusUpdate(update)
+
     def frameworkMessage(self, driver, executorId, slaveId, message):
         self.messagesReceived += 1
 
@@ -163,6 +169,11 @@ if __name__ == "__main__":
         print "Enabling checkpoint for the framework"
         framework.checkpoint = True
 
+    implicitAcknowledgements = 1
+    if os.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS"):
+        print "Enabling explicit status update acknowledgements"
+        implicitAcknowledgements = 0
+
     if os.getenv("MESOS_AUTHENTICATE"):
         print "Enabling authentication for the framework"
 
@@ -181,17 +192,19 @@ if __name__ == "__main__":
         framework.principal = os.getenv("DEFAULT_PRINCIPAL")
 
         driver = mesos.native.MesosSchedulerDriver(
-            TestScheduler(executor),
+            TestScheduler(implicitAcknowledgements, executor),
             framework,
             sys.argv[1],
+            implicitAcknowledgements,
             credential)
     else:
         framework.principal = "test-framework-python"
 
         driver = mesos.native.MesosSchedulerDriver(
-            TestScheduler(executor),
+            TestScheduler(implicitAcknowledgements, executor),
             framework,
-            sys.argv[1])
+            sys.argv[1],
+            implicitAcknowledgements)
 
     status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1aeec05/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index 0a4fde5..04468c2 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -56,8 +56,12 @@ const int32_t MEM_PER_TASK = 128;
 class TestScheduler : public Scheduler
 {
 public:
-  TestScheduler(const ExecutorInfo& _executor, const string& _role)
-    : executor(_executor),
+  TestScheduler(
+      bool _implicitAcknowledgements,
+      const ExecutorInfo& _executor,
+      const string& _role)
+    : implicitAcknowledgements(_implicitAcknowledgements),
+      executor(_executor),
       role(_role),
       tasksLaunched(0),
       tasksFinished(0),
@@ -127,8 +131,9 @@ public:
 
     cout << "Task " << taskId << " is in state " << status.state() << endl;
 
-    if (status.state() == TASK_FINISHED)
+    if (status.state() == TASK_FINISHED) {
       tasksFinished++;
+    }
 
     if (status.state() == TASK_LOST ||
         status.state() == TASK_KILLED ||
@@ -142,8 +147,13 @@ public:
       driver->abort();
     }
 
-    if (tasksFinished == totalTasks)
+    if (!implicitAcknowledgements) {
+      driver->acknowledgeStatusUpdate(status);
+    }
+
+    if (tasksFinished == totalTasks) {
       driver->stop();
+    }
   }
 
   virtual void frameworkMessage(SchedulerDriver* driver,
@@ -164,6 +174,7 @@ public:
   }
 
 private:
+  const bool implicitAcknowledgements;
   const ExecutorInfo executor;
   string role;
   int tasksLaunched;
@@ -223,8 +234,6 @@ int main(int argc, char** argv)
   executor.set_name("Test Executor (C++)");
   executor.set_source("cpp_test");
 
-  TestScheduler scheduler(executor, role);
-
   FrameworkInfo framework;
   framework.set_user(""); // Have Mesos fill in the current user.
   framework.set_name("Test Framework (C++)");
@@ -235,7 +244,16 @@ int main(int argc, char** argv)
         numify<bool>(os::getenv("MESOS_CHECKPOINT")).get());
   }
 
+  bool implicitAcknowledgements = true;
+  if (os::hasenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS")) {
+    cout << "Enabling explicit acknowledgements for status updates" << endl;
+
+    implicitAcknowledgements = false;
+  }
+
   MesosSchedulerDriver* driver;
+  TestScheduler scheduler(implicitAcknowledgements, executor, role);
+
   if (os::hasenv("MESOS_AUTHENTICATE")) {
     cout << "Enabling authentication for the framework" << endl;
 
@@ -254,12 +272,19 @@ int main(int argc, char** argv)
     framework.set_principal(getenv("DEFAULT_PRINCIPAL"));
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, master.get(), credential);
+        &scheduler,
+        framework,
+        master.get(),
+        implicitAcknowledgements,
+        credential);
   } else {
     framework.set_principal("test-framework-cpp");
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, master.get());
+        &scheduler,
+        framework,
+        master.get(),
+        implicitAcknowledgements);
   }
 
   int status = driver->run() == DRIVER_STOPPED ? 0 : 1;