You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/12 21:23:34 UTC

[1/5] mesos git commit: Updated the long-lived-framework example.

Repository: mesos
Updated Branches:
  refs/heads/master 5c04712d9 -> b36596b9d


Updated the long-lived-framework example.

This gives the example `long-lived-framework` enough options to run
outside of the build environment.

This also updates:

* The style of the framework code.
* Gives the `ExecutorInfo` some resources (needed for some cgroups
  isolators).
* Restricts the framework to one agent. Otherwise, it would grab a
  small chunk of every machine in the cluster.
* Adds filters for declined offers.

Review: https://reviews.apache.org/r/45067/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/debf0ac6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/debf0ac6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/debf0ac6

Branch: refs/heads/master
Commit: debf0ac6b84ee9beec9bc06c3aac81f2dc833f93
Parents: 5c04712
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Tue Apr 12 12:23:09 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:09 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 295 ++++++++++++++++++++---------
 1 file changed, 204 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/debf0ac6/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index ef498d6..29ee6f2 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -14,14 +14,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <glog/logging.h>
+
 #include <iostream>
 #include <string>
 
-#include <boost/lexical_cast.hpp>
-
+#include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
-#include <stout/numify.hpp>
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
@@ -29,23 +31,32 @@
 
 using namespace mesos;
 
-using boost::lexical_cast;
-
-using std::cout;
-using std::cerr;
-using std::endl;
-using std::flush;
 using std::string;
 using std::vector;
 
-const int32_t CPUS_PER_TASK = 1;
-const int32_t MEM_PER_TASK = 32;
 
+// NOTE: Per-task resources are nominal because all of the resources for the
+// container are provisioned when the executor is created. The executor can
+// run multiple tasks at once, but uses a constant amount of resources
+// regardless of the number of tasks.
+const double CPUS_PER_TASK = 0.001;
+const int32_t MEM_PER_TASK = 1;
+
+const double CPUS_PER_EXECUTOR = 0.1;
+const int32_t MEM_PER_EXECUTOR = 32;
+
+
+// This scheduler picks one slave and repeatedly launches sleep tasks on it,
+// using a single multi-task executor. If the slave or executor fails, the
+// scheduler will pick another slave and continue launching sleep tasks.
 class LongLivedScheduler : public Scheduler
 {
 public:
   explicit LongLivedScheduler(const ExecutorInfo& _executor)
     : executor(_executor),
+      taskResources(Resources::parse(
+          "cpus:" + stringify(CPUS_PER_TASK) +
+          ";mem:" + stringify(MEM_PER_TASK)).get()),
       tasksLaunched(0) {}
 
   virtual ~LongLivedScheduler() {}
@@ -54,70 +65,58 @@ public:
                           const FrameworkID&,
                           const MasterInfo&)
   {
-    cout << "Registered!" << endl;
+    LOG(INFO) << "Registered!";
   }
 
-  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}
+  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
+  {
+    LOG(INFO) << "Re-registered!";
+  }
 
-  virtual void disconnected(SchedulerDriver* driver) {}
+  virtual void disconnected(SchedulerDriver* driver)
+  {
+    LOG(INFO) << "Disconnected!";
+  }
 
   virtual void resourceOffers(SchedulerDriver* driver,
                               const vector<Offer>& offers)
   {
-    cout << "." << flush;
-    for (size_t i = 0; i < offers.size(); i++) {
-      const Offer& offer = offers[i];
-
-      // Lookup resources we care about.
-      // TODO(benh): It would be nice to ultimately have some helper
-      // functions for looking up resources.
-      double cpus = 0;
-      double mem = 0;
-
-      for (int i = 0; i < offer.resources_size(); i++) {
-        const Resource& resource = offer.resources(i);
-        if (resource.name() == "cpus" &&
-            resource.type() == Value::SCALAR) {
-          cpus = resource.scalar().value();
-        } else if (resource.name() == "mem" &&
-                   resource.type() == Value::SCALAR) {
-          mem = resource.scalar().value();
-        }
-      }
+    static const Resources EXECUTOR_RESOURCES = Resources(executor.resources());
 
-      // Launch tasks (only one per offer).
-      vector<TaskInfo> tasks;
-      if (cpus >= CPUS_PER_TASK && mem >= MEM_PER_TASK) {
-        int taskId = tasksLaunched++;
+    foreach (const Offer& offer, offers) {
+      if (slaveId.isNone()) {
+        // No active executor running in the cluster.
+        // Launch a new task with executor.
 
-        cout << "Starting task " << taskId << " on "
-             << offer.hostname() << endl;
+        if (Resources(offer.resources()).flatten()
+            .contains(EXECUTOR_RESOURCES + taskResources)) {
+          LOG(INFO)
+            << "Starting executor and task " << tasksLaunched
+            << " on " << offer.hostname();
 
-        TaskInfo task;
-        task.set_name("Task " + lexical_cast<string>(taskId));
-        task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
-        task.mutable_slave_id()->MergeFrom(offer.slave_id());
-        task.mutable_executor()->MergeFrom(executor);
+          launchTask(driver, offer);
 
-        Resource* resource;
-
-        resource = task.add_resources();
-        resource->set_name("cpus");
-        resource->set_type(Value::SCALAR);
-        resource->mutable_scalar()->set_value(CPUS_PER_TASK);
-
-        resource = task.add_resources();
-        resource->set_name("mem");
-        resource->set_type(Value::SCALAR);
-        resource->mutable_scalar()->set_value(MEM_PER_TASK);
+          slaveId = offer.slave_id();
+        } else {
+          declineOffer(driver, offer);
+        }
+      } else if (slaveId == offer.slave_id()) {
+        // Offer from the same slave that has an active executor.
+        // Launch more tasks on that executor.
 
-        tasks.push_back(task);
+        if (Resources(offer.resources()).flatten().contains(taskResources)) {
+          LOG(INFO)
+            << "Starting task " << tasksLaunched << " on " << offer.hostname();
 
-        cpus -= CPUS_PER_TASK;
-        mem -= MEM_PER_TASK;
+          launchTask(driver, offer);
+        } else {
+          declineOffer(driver, offer);
+        }
+      } else {
+        // We have an active executor but this offer comes from a
+        // different slave; decline the offer.
+        declineOffer(driver, offer);
       }
-
-      driver->launchTasks(offer.id(), tasks);
     }
   }
 
@@ -126,9 +125,10 @@ public:
 
   virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
   {
-    int taskId = lexical_cast<int>(status.task_id().value());
-    cout << "Task " << taskId << " is in state "
-         << TaskState_Name(status.state()) << endl;
+    LOG(INFO)
+      << "Task " << status.task_id().value()
+      << " is in state " << TaskState_Name(status.state())
+      << (status.has_message() ? " with message: " + status.message() : "");
   }
 
   virtual void frameworkMessage(SchedulerDriver* driver,
@@ -136,63 +136,176 @@ public:
                                 const SlaveID& slaveId,
                                 const string& data) {}
 
-  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {}
+  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& _slaveId)
+  {
+    LOG(INFO) << "Slave lost: " << _slaveId;
+
+    if (slaveId == _slaveId) {
+      slaveId = None();
+    }
+  }
 
   virtual void executorLost(SchedulerDriver* driver,
                             const ExecutorID& executorId,
-                            const SlaveID& slaveId,
-                            int status) {}
+                            const SlaveID& _slaveId,
+                            int status)
+  {
+    LOG(INFO)
+      << "Executor '" << executorId << "' lost on slave "
+      << _slaveId << " with status: " << status;
+
+    slaveId = None();
+  }
 
   virtual void error(SchedulerDriver* driver, const string& message) {}
 
 private:
+  // Helper to decline an offer.
+  void declineOffer(SchedulerDriver* driver, const Offer& offer)
+  {
+    Filters filters;
+    filters.set_refuse_seconds(600);
+
+    driver->declineOffer(offer.id(), filters);
+  }
+
+  // Helper to launch a task using an offer.
+  void launchTask(SchedulerDriver* driver, const Offer& offer)
+  {
+    int taskId = tasksLaunched++;
+
+    TaskInfo task;
+    task.set_name("Task " + stringify(taskId));
+    task.mutable_task_id()->set_value(stringify(taskId));
+    task.mutable_slave_id()->MergeFrom(offer.slave_id());
+    task.mutable_resources()->CopyFrom(taskResources);
+    task.mutable_executor()->CopyFrom(executor);
+
+    driver->launchTasks(offer.id(), {task});
+  }
+
   const ExecutorInfo executor;
+  const Resources taskResources;
   string uri;
   int tasksLaunched;
+
+  // The slave that is running the long-lived-executor.
+  // Unless that slave/executor dies, this framework will not launch
+  // an executor on any other slave.
+  Option<SlaveID> slaveId;
+};
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    add(&master,
+        "master",
+        "Master to connect to.",
+        [](const Option<string>& value) -> Option<Error> {
+          if (value.isNone()) {
+            return Error("Missing --master");
+          }
+
+          return None();
+        });
+
+    add(&build_dir,
+        "build_dir",
+        "The build directory of Mesos. If set, the framework will assume\n"
+        "that the executor, framework, and agent(s) all live on the same\n"
+        "machine.");
+
+    add(&executor_uri,
+        "executor_uri",
+        "URI the fetcher should use to get the executor.");
+
+    add(&executor_command,
+        "executor_command",
+        "The command that should be used to start the executor.\n"
+        "This will override the value set by `--build_dir`.");
+
+    add(&checkpoint,
+        "checkpoint",
+        "Whether this framework should be checkpointed.",
+        false);
+  }
+
+  Option<string> master;
+
+  // Flags for specifying the executor binary.
+  Option<string> build_dir;
+  Option<string> executor_uri;
+  Option<string> executor_command;
+
+  bool checkpoint;
 };
 
 
 int main(int argc, char** argv)
 {
-  if (argc != 2) {
-    cerr << "Usage: " << argv[0] << " <master>" << endl;
-    return -1;
+  Flags flags;
+  Try<Nothing> load = flags.load("MESOS_", argc, argv);
+
+  if (load.isError()) {
+    EXIT(EXIT_FAILURE) << flags.usage(load.error());
   }
 
+  const Resources resources = Resources::parse(
+      "cpus:" + stringify(CPUS_PER_EXECUTOR) +
+      ";mem:" + stringify(MEM_PER_EXECUTOR)).get();
+
+  ExecutorInfo executor;
+  executor.mutable_executor_id()->set_value("default");
+  executor.mutable_resources()->CopyFrom(resources);
+  executor.set_name("Long Lived Executor (C++)");
+  executor.set_source("cpp_long_lived_framework");
+
+  // Determine the command to run the executor based on three possibilities:
+  //   1) `--executor_command` was set, which overrides the below cases.
+  //   2) We are in the Mesos build directory, so the targeted executable
+  //      is actually a libtool wrapper script.
+  //   3) We have not detected the Mesos build directory, so assume the
+  //      executor is in the same directory as the framework.
+  string command;
+
   // Find this executable's directory to locate executor.
-  string uri;
-  Option<string> value = os::getenv("MESOS_BUILD_DIR");
-  if (value.isSome()) {
-    uri = path::join(value.get(), "src", "long-lived-executor");
+  if (flags.executor_command.isSome()) {
+    command = flags.executor_command.get();
+  } else if (flags.build_dir.isSome()) {
+    command = path::join(
+        flags.build_dir.get(), "src", "long-lived-executor");
   } else {
-    uri = path::join(
+    command = path::join(
         os::realpath(Path(argv[0]).dirname()).get(),
         "long-lived-executor");
   }
 
-  ExecutorInfo executor;
-  executor.mutable_executor_id()->set_value("default");
-  executor.mutable_command()->set_value(uri);
-  executor.set_name("Long Lived Executor (C++)");
-  executor.set_source("cpp_long_lived_framework");
+  executor.mutable_command()->set_value(command);
+
+  // Copy `--executor_uri` into the command.
+  if (flags.executor_uri.isSome()) {
+    mesos::CommandInfo::URI* uri = executor.mutable_command()->add_uris();
+    uri->set_value(flags.executor_uri.get());
+    uri->set_executable(true);
+  }
 
   LongLivedScheduler scheduler(executor);
 
   FrameworkInfo framework;
-  framework.set_user(""); // Have Mesos fill in the current user.
+  framework.set_user(os::user().get());
   framework.set_name("Long Lived Framework (C++)");
-
-  value = os::getenv("MESOS_CHECKPOINT");
-  if (value.isSome()) {
-    framework.set_checkpoint(
-        numify<bool>(value.get()).get());
-  }
+  framework.set_checkpoint(flags.checkpoint);
 
   MesosSchedulerDriver* driver;
+
+  // TODO(josephw): Refactor these into a common set of flags.
   if (os::getenv("MESOS_AUTHENTICATE").isSome()) {
-    cout << "Enabling authentication for the framework" << endl;
+    LOG(INFO) << "Enabling authentication for the framework";
 
-    value = os::getenv("DEFAULT_PRINCIPAL");
+    Option<string> value = os::getenv("DEFAULT_PRINCIPAL");
     if (value.isNone()) {
       EXIT(EXIT_FAILURE)
         << "Expecting authentication principal in the environment";
@@ -212,12 +325,12 @@ int main(int argc, char** argv)
     credential.set_secret(value.get());
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, argv[1], credential);
+        &scheduler, framework, flags.master.get(), credential);
   } else {
     framework.set_principal("long-lived-framework-cpp");
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, argv[1]);
+        &scheduler, framework, flags.master.get());
   }
 
   int status = driver->run() == DRIVER_STOPPED ? 0 : 1;


[2/5] mesos git commit: Added some metrics to the long-lived-framework example.

Posted by vi...@apache.org.
Added some metrics to the long-lived-framework example.

Adds metrics to gauge the health of the framework.  This includes:

* uptime_secs = How long the framework has been running.
* registered = If the framework is registered.
* offers_received = A counter used to determine if the framework is
  starved or not.
* tasks_launched = Number of tasks launched.
* abnormal_terminations = Number of terminal status updates which
  were not `TASK_FINISHED`.

Also adds an endpoint `/framework/counters` which returns the list of
metrics which are "counters".

Review: https://reviews.apache.org/r/45440/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c82c12ae
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c82c12ae
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c82c12ae

Branch: refs/heads/master
Commit: c82c12aecea0e44214e0a9ac8ad02e33aa197274
Parents: debf0ac
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Tue Apr 12 12:23:13 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:13 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 136 ++++++++++++++++++++++++++++-
 1 file changed, 134 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c82c12ae/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 29ee6f2..035ddda 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -22,6 +22,18 @@
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <process/clock.hpp>
+#include <process/defer.hpp>
+#include <process/help.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/time.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+
 #include <stout/flags.hpp>
 #include <stout/foreach.hpp>
 #include <stout/option.hpp>
@@ -34,6 +46,18 @@ using namespace mesos;
 using std::string;
 using std::vector;
 
+using process::AUTHENTICATION;
+using process::Clock;
+using process::defer;
+using process::DESCRIPTION;
+using process::HELP;
+using process::TLDR;
+
+using process::http::OK;
+
+using process::metrics::Gauge;
+using process::metrics::Counter;
+
 
 // NOTE: Per-task resources are nominal because all of the resources for the
 // container are provisioned when the executor is created. The executor can
@@ -57,25 +81,39 @@ public:
       taskResources(Resources::parse(
           "cpus:" + stringify(CPUS_PER_TASK) +
           ";mem:" + stringify(MEM_PER_TASK)).get()),
-      tasksLaunched(0) {}
+      tasksLaunched(0),
+      metrics(*this)
+  {
+    process::spawn(metrics);
+  }
 
-  virtual ~LongLivedScheduler() {}
+  virtual ~LongLivedScheduler()
+  {
+    process::terminate(metrics);
+    process::wait(metrics);
+  }
 
   virtual void registered(SchedulerDriver*,
                           const FrameworkID&,
                           const MasterInfo&)
   {
     LOG(INFO) << "Registered!";
+
+    metrics.isRegistered = true;
   }
 
   virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
   {
     LOG(INFO) << "Re-registered!";
+
+    metrics.isRegistered = true;
   }
 
   virtual void disconnected(SchedulerDriver* driver)
   {
     LOG(INFO) << "Disconnected!";
+
+    metrics.isRegistered = false;
   }
 
   virtual void resourceOffers(SchedulerDriver* driver,
@@ -83,6 +121,8 @@ public:
   {
     static const Resources EXECUTOR_RESOURCES = Resources(executor.resources());
 
+    metrics.offers_received += offers.size();
+
     foreach (const Offer& offer, offers) {
       if (slaveId.isNone()) {
         // No active executor running in the cluster.
@@ -129,6 +169,13 @@ public:
       << "Task " << status.task_id().value()
       << " is in state " << TaskState_Name(status.state())
       << (status.has_message() ? " with message: " + status.message() : "");
+
+    if (status.state() == TASK_KILLED ||
+        status.state() == TASK_LOST ||
+        status.state() == TASK_FAILED ||
+        status.state() == TASK_ERROR) {
+      ++metrics.abnormal_terminations;
+    }
   }
 
   virtual void frameworkMessage(SchedulerDriver* driver,
@@ -193,6 +240,91 @@ private:
   // Unless that slave/executor dies, this framework will not launch
   // an executor on any other slave.
   Option<SlaveID> slaveId;
+
+  struct Metrics : process::Process<Metrics>
+  {
+    Metrics(const LongLivedScheduler& _scheduler)
+      : ProcessBase("framework"),
+        scheduler(_scheduler),
+        isRegistered(false),
+        uptime_secs(
+            "long_lived_framework/uptime_secs",
+            defer(this, &Self::_uptime_secs)),
+        registered(
+            "long_lived_framework/registered",
+            defer(this, &Self::_registered)),
+        offers_received("long_lived_framework/offers_received"),
+        tasks_launched(
+            "long_lived_framework/tasks_launched",
+            defer(this, &Self::_tasksLaunched)),
+        abnormal_terminations("long_lived_framework/abnormal_terminations")
+    {
+      start_time = Clock::now();
+
+      process::metrics::add(uptime_secs);
+      process::metrics::add(registered);
+      process::metrics::add(offers_received);
+      process::metrics::add(tasks_launched);
+      process::metrics::add(abnormal_terminations);
+    }
+
+    virtual void initialize()
+    {
+      // Special route for metric metadata.
+      route(
+          "/counters",
+          HELP(
+              TLDR("List of counter-type metrics."),
+              DESCRIPTION("Returns 200 OK iff the request is accepted."),
+              AUTHENTICATION(false)),
+          [this](const process::http::Request& request) {
+            JSON::Array array;
+            array.values.push_back("long_lived_framework/offers_received");
+            array.values.push_back(
+                "long_lived_framework/abnormal_terminations");
+
+            return OK(array, request.url.query.get("jsonp"));
+          });
+    }
+
+    ~Metrics()
+    {
+      process::metrics::remove(uptime_secs);
+      process::metrics::remove(registered);
+      process::metrics::remove(offers_received);
+      process::metrics::remove(tasks_launched);
+      process::metrics::remove(abnormal_terminations);
+    }
+
+    const LongLivedScheduler& scheduler;
+
+    process::Time start_time;
+    double _uptime_secs()
+    {
+      return (Clock::now() - start_time).secs();
+    }
+
+    bool isRegistered;
+    double _registered()
+    {
+      return isRegistered ? 1 : 0;
+    }
+
+    double _tasksLaunched()
+    {
+      return scheduler.tasksLaunched;
+    }
+
+    process::metrics::Gauge uptime_secs;
+    process::metrics::Gauge registered;
+
+    process::metrics::Counter offers_received;
+    process::metrics::Gauge tasks_launched;
+
+    // The only expected terminal state is TASK_FINISHED.
+    // Other terminal states are considered incorrect.
+    process::metrics::Counter abnormal_terminations;
+  } metrics;
 };
 
 


[4/5] mesos git commit: Moved long running framework to use the v1 API.

Posted by vi...@apache.org.
Moved long running framework to use the v1 API.

See summary. Also removed the AuthN code. Once, the library
supports AuthN we can bring it back.

Review: https://reviews.apache.org/r/45800/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f3aae0f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f3aae0f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f3aae0f0

Branch: refs/heads/master
Commit: f3aae0f04aacaaed1e014a010ed3c22cf5a3c075
Parents: a5b0cbe
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Apr 12 12:23:20 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:20 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 343 +++++++++++++++++++++--------
 1 file changed, 255 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f3aae0f0/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 7d8a5c3..f5700bd 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -17,15 +17,18 @@
 #include <glog/logging.h>
 
 #include <iostream>
+#include <queue>
 #include <string>
 
-#include <mesos/resources.hpp>
-#include <mesos/scheduler.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
 
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/help.hpp>
 #include <process/http.hpp>
+#include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/time.hpp>
@@ -41,16 +44,33 @@
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
 
-using namespace mesos;
-
+using std::queue;
 using std::string;
 using std::vector;
 
+using mesos::v1::AgentID;
+using mesos::v1::CommandInfo;
+using mesos::v1::ExecutorID;
+using mesos::v1::ExecutorInfo;
+using mesos::v1::Filters;
+using mesos::v1::FrameworkID;
+using mesos::v1::FrameworkInfo;
+using mesos::v1::Offer;
+using mesos::v1::Resources;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
 using process::AUTHENTICATION;
 using process::Clock;
 using process::defer;
 using process::DESCRIPTION;
 using process::HELP;
+using process::Owned;
 using process::TLDR;
 
 using process::http::OK;
@@ -70,14 +90,20 @@ const double CPUS_PER_EXECUTOR = 0.1;
 const int32_t MEM_PER_EXECUTOR = 32;
 
 
-// This scheduler picks one slave and repeatedly launches sleep tasks on it,
-// using a single multi-task executor. If the slave or executor fails, the
-// scheduler will pick another slave and continue launching sleep tasks.
-class LongLivedScheduler : public Scheduler
+// This scheduler picks one agent and repeatedly launches sleep tasks on it,
+// using a single multi-task executor. If the agent or executor fails, the
+// scheduler will pick another agent and continue launching sleep tasks.
+class LongLivedScheduler : public process::Process<LongLivedScheduler>
 {
 public:
-  explicit LongLivedScheduler(const ExecutorInfo& _executor)
-    : executor(_executor),
+  LongLivedScheduler(
+      const string& _master,
+      const FrameworkInfo& _framework,
+      const ExecutorInfo& _executor)
+    : state(DISCONNECTED),
+      master(_master),
+      framework(_framework),
+      executor(_executor),
       taskResources(Resources::parse(
           "cpus:" + stringify(CPUS_PER_TASK) +
           ";mem:" + stringify(MEM_PER_TASK)).get()),
@@ -93,15 +119,131 @@ public:
     process::wait(metrics);
   }
 
-  virtual void resourceOffers(SchedulerDriver* driver,
-                              const vector<Offer>& offers)
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+      master,
+      mesos::ContentType::PROTOBUF,
+      process::defer(self(), &Self::connected),
+      process::defer(self(), &Self::disconnected),
+      process::defer(self(), &Self::received, lambda::_1)));
+  }
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    LOG(INFO) << "Disconnected!";
+
+    state = DISCONNECTED;
+
+    metrics.isRegistered = false;
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      LOG(INFO) << "Received " << event.type() << " event";
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          LOG(INFO) << "Subscribed with ID '" << framework.id();
+
+          framework.mutable_id()->
+            CopyFrom(event.subscribed().framework_id());
+
+          state = SUBSCRIBED;
+
+          metrics.isRegistered = true;
+          break;
+        }
+
+        case Event::OFFERS: {
+          offers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::UPDATE: {
+          update(event.update().status());
+          break;
+        }
+
+        case Event::FAILURE: {
+          const Event::Failure& failure = event.failure();
+
+          if (failure.has_agent_id() && failure.has_executor_id()) {
+            executorFailed(
+                failure.executor_id(),
+                failure.agent_id(),
+                failure.has_status() ? Option<int>(failure.status()) : None());
+          } else {
+            CHECK(failure.has_agent_id());
+
+            agentFailed(failure.agent_id());
+          }
+          break;
+        }
+
+        case Event::ERROR: {
+          LOG(ERROR) << "Error: " << event.error().message();
+          break;
+        }
+
+        case Event::HEARTBEAT:
+        case Event::RESCIND:
+        case Event::MESSAGE: {
+          break;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
+      }
+    }
+  }
+
+  void offers(const vector<Offer>& offers)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     static const Resources EXECUTOR_RESOURCES = Resources(executor.resources());
 
     metrics.offers_received += offers.size();
 
     foreach (const Offer& offer, offers) {
-      if (slaveId.isNone()) {
+      if (agentId.isNone()) {
         // No active executor running in the cluster.
         // Launch a new task with executor.
 
@@ -111,102 +253,158 @@ public:
             << "Starting executor and task " << tasksLaunched
             << " on " << offer.hostname();
 
-          launchTask(driver, offer);
+          launch(offer);
 
-          slaveId = offer.slave_id();
+          agentId = offer.agent_id();
         } else {
-          declineOffer(driver, offer);
+          decline(offer);
         }
-      } else if (slaveId == offer.slave_id()) {
-        // Offer from the same slave that has an active executor.
+      } else if (agentId == offer.agent_id()) {
+        // Offer from the same agent that has an active executor.
         // Launch more tasks on that executor.
 
         if (Resources(offer.resources()).flatten().contains(taskResources)) {
           LOG(INFO)
             << "Starting task " << tasksLaunched << " on " << offer.hostname();
 
-          launchTask(driver, offer);
+          launch(offer);
         } else {
-          declineOffer(driver, offer);
+          decline(offer);
         }
       } else {
         // We have an active executor but this offer comes from a
-        // different slave; decline the offer.
-        declineOffer(driver, offer);
+        // different agent; decline the offer.
+        decline(offer);
       }
     }
   }
 
-  virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+  void update(const TaskStatus& status)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     LOG(INFO)
       << "Task " << status.task_id().value()
       << " is in state " << TaskState_Name(status.state())
       << (status.has_message() ? " with message: " + status.message() : "");
 
-    if (status.state() == TASK_KILLED ||
-        status.state() == TASK_LOST ||
-        status.state() == TASK_FAILED ||
-        status.state() == TASK_ERROR) {
+    if (status.has_uuid()) {
+      Call call;
+      call.set_type(Call::ACKNOWLEDGE);
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      mesos->send(call);
+    }
+
+    if (status.state() == TaskState::TASK_KILLED ||
+        status.state() == TaskState::TASK_LOST ||
+        status.state() == TaskState::TASK_FAILED ||
+        status.state() == TaskState::TASK_ERROR) {
       ++metrics.abnormal_terminations;
     }
   }
 
-  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& _slaveId)
+  void agentFailed(const AgentID& _agentId)
   {
-    LOG(INFO) << "Slave lost: " << _slaveId;
+    CHECK_EQ(SUBSCRIBED, state);
+
+    LOG(INFO) << "Agent lost: " << _agentId;
 
-    if (slaveId == _slaveId) {
-      slaveId = None();
+    if (agentId == _agentId) {
+      agentId = None();
     }
   }
 
-  virtual void executorLost(SchedulerDriver* driver,
-                            const ExecutorID& executorId,
-                            const SlaveID& _slaveId,
-                            int status)
+  void executorFailed(
+      const ExecutorID& executorId,
+      const AgentID& _agentId,
+      const Option<int>& status)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     LOG(INFO)
-      << "Executor '" << executorId << "' lost on slave "
-      << _slaveId << " with status: " << status;
+      << "Executor '" << executorId << "' lost on agent '" << _agentId
+      << (status.isSome() ? "' with status: " + stringify(status.get()) : "");
 
-    slaveId = None();
+    agentId = None();
   }
 
 private:
   // Helper to decline an offer.
-  void declineOffer(SchedulerDriver* driver, const Offer& offer)
+  void decline(const Offer& offer)
   {
     Filters filters;
     filters.set_refuse_seconds(600);
 
-    driver->declineOffer(offer.id(), filters);
+    Call call;
+    call.set_type(Call::DECLINE);
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(offer.id());
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos->send(call);
   }
 
   // Helper to launch a task using an offer.
-  void launchTask(SchedulerDriver* driver, const Offer& offer)
+  void launch(const Offer& offer)
   {
     int taskId = tasksLaunched++;
 
     TaskInfo task;
     task.set_name("Task " + stringify(taskId));
     task.mutable_task_id()->set_value(stringify(taskId));
-    task.mutable_slave_id()->MergeFrom(offer.slave_id());
+    task.mutable_agent_id()->MergeFrom(offer.agent_id());
     task.mutable_resources()->CopyFrom(taskResources);
     task.mutable_executor()->CopyFrom(executor);
 
-    driver->launchTasks(offer.id(), {task});
+    Call call;
+    call.set_type(Call::ACCEPT);
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+
+    operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+
+    mesos->send(call);
   }
 
+  enum State
+  {
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED
+  } state;
+
+  const string master;
+  FrameworkInfo framework;
   const ExecutorInfo executor;
   const Resources taskResources;
   string uri;
   int tasksLaunched;
 
-  // The slave that is running the long-lived-executor.
-  // Unless that slave/executor dies, this framework will not launch
-  // an executor on any other slave.
-  Option<SlaveID> slaveId;
+  // The agent that is running the long-lived-executor.
+  // Unless that agent/executor dies, this framework will not launch
+  // an executor on any other agent.
+  Option<AgentID> agentId;
+
+  Owned<Mesos> mesos;
 
   struct Metrics : process::Process<Metrics>
   {
@@ -386,57 +584,26 @@ int main(int argc, char** argv)
 
   // Copy `--executor_uri` into the command.
   if (flags.executor_uri.isSome()) {
-    mesos::CommandInfo::URI* uri = executor.mutable_command()->add_uris();
+    CommandInfo::URI* uri = executor.mutable_command()->add_uris();
     uri->set_value(flags.executor_uri.get());
     uri->set_executable(true);
   }
 
-  LongLivedScheduler scheduler(executor);
-
   FrameworkInfo framework;
   framework.set_user(os::user().get());
   framework.set_name("Long Lived Framework (C++)");
   framework.set_checkpoint(flags.checkpoint);
 
-  MesosSchedulerDriver* driver;
-
-  // TODO(josephw): Refactor these into a common set of flags.
-  if (os::getenv("MESOS_AUTHENTICATE").isSome()) {
-    LOG(INFO) << "Enabling authentication for the framework";
-
-    Option<string> value = os::getenv("DEFAULT_PRINCIPAL");
-    if (value.isNone()) {
-      EXIT(EXIT_FAILURE)
-        << "Expecting authentication principal in the environment";
-    }
-
-    Credential credential;
-    credential.set_principal(value.get());
-
-    framework.set_principal(value.get());
-
-    value = os::getenv("DEFAULT_SECRET");
-    if (value.isNone()) {
-      EXIT(EXIT_FAILURE)
-        << "Expecting authentication secret in the environment";
-    }
-
-    credential.set_secret(value.get());
-
-    driver = new MesosSchedulerDriver(
-        &scheduler, framework, flags.master.get(), credential);
-  } else {
-    framework.set_principal("long-lived-framework-cpp");
-
-    driver = new MesosSchedulerDriver(
-        &scheduler, framework, flags.master.get());
-  }
+  // TODO(anand): Add support for AuthN once MESOS-3923 is resolved.
 
-  int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
+  Owned<LongLivedScheduler> scheduler(
+      new LongLivedScheduler(
+        flags.master.get(),
+        framework,
+        executor));
 
-  // Ensure that the driver process terminates.
-  driver->stop();
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
 
-  delete driver;
-  return status;
+  return EXIT_SUCCESS;
 }


[5/5] mesos git commit: Move metrics code into the scheduler body.

Posted by vi...@apache.org.
Move metrics code into the scheduler body.

Since the HTTP scheduler is a libprocess process, the sub-class
`Metrics` does not need to be a process anymore.  This moves the
process-specific code into the appropriate place in the scheduler.

Review: https://reviews.apache.org/r/45910/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b36596b9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b36596b9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b36596b9

Branch: refs/heads/master
Commit: b36596b9dcdc9609147f401c0011f712c55608f8
Parents: f3aae0f
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Tue Apr 12 12:23:24 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:24 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 93 +++++++++---------------------
 1 file changed, 26 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b36596b9/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index f5700bd..853e676 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -110,14 +110,10 @@ public:
       tasksLaunched(0),
       metrics(*this)
   {
-    process::spawn(metrics);
+    start_time = Clock::now();
   }
 
-  virtual ~LongLivedScheduler()
-  {
-    process::terminate(metrics);
-    process::wait(metrics);
-  }
+  virtual ~LongLivedScheduler() {}
 
 protected:
   virtual void initialize()
@@ -144,8 +140,6 @@ protected:
     LOG(INFO) << "Disconnected!";
 
     state = DISCONNECTED;
-
-    metrics.isRegistered = false;
   }
 
   void doReliableRegistration()
@@ -185,8 +179,6 @@ protected:
             CopyFrom(event.subscribed().framework_id());
 
           state = SUBSCRIBED;
-
-          metrics.isRegistered = true;
           break;
         }
 
@@ -360,6 +352,7 @@ private:
   void launch(const Offer& offer)
   {
     int taskId = tasksLaunched++;
+    ++metrics.tasks_launched;
 
     TaskInfo task;
     task.set_name("Task " + stringify(taskId));
@@ -406,85 +399,51 @@ private:
 
   Owned<Mesos> mesos;
 
-  struct Metrics : process::Process<Metrics>
+  process::Time start_time;
+  double _uptime_secs()
+  {
+    return (Clock::now() - start_time).secs();
+  }
+
+  double _subscribed()
+  {
+    return state == SUBSCRIBED ? 1 : 0;
+  }
+
+  struct Metrics
   {
-    Metrics(const LongLivedScheduler& _scheduler)
-      : ProcessBase("framework"),
-        scheduler(_scheduler),
-        isRegistered(false),
-        uptime_secs(
+    Metrics(const LongLivedScheduler& scheduler)
+      : uptime_secs(
             "long_lived_framework/uptime_secs",
-            defer(this, &Self::_uptime_secs)),
-        registered(
-            "long_lived_framework/registered",
-            defer(this, &Self::_registered)),
+            defer(scheduler, &LongLivedScheduler::_uptime_secs)),
+        subscribed(
+            "long_lived_framework/subscribed",
+            defer(scheduler, &LongLivedScheduler::_subscribed)),
         offers_received("long_lived_framework/offers_received"),
-        tasks_launched(
-            "long_lived_framework/tasks_launched",
-            defer(this, &Self::_tasksLaunched)),
+        tasks_launched("long_lived_framework/tasks_launched"),
         abnormal_terminations("long_lived_framework/abnormal_terminations")
     {
-      start_time = Clock::now();
-
       process::metrics::add(uptime_secs);
-      process::metrics::add(registered);
+      process::metrics::add(subscribed);
       process::metrics::add(offers_received);
       process::metrics::add(tasks_launched);
       process::metrics::add(abnormal_terminations);
     }
 
-    virtual void initialize()
-    {
-      // Special route for metric metadata.
-      route(
-          "/counters",
-          HELP(
-              TLDR("List of counter-type metrics."),
-              DESCRIPTION("Returns 200 OK iff the request is accepted."),
-              AUTHENTICATION(false)),
-          [this](const process::http::Request& request) {
-            JSON::Array array;
-            array.values.push_back("long_lived_framework/offers_received");
-            array.values.push_back(
-                "long_lived_framework/abnormal_terminations");
-
-            return OK(array, request.url.query.get("jsonp"));
-          });
-    }
-
     ~Metrics()
     {
       process::metrics::remove(uptime_secs);
-      process::metrics::remove(registered);
+      process::metrics::remove(subscribed);
       process::metrics::remove(offers_received);
       process::metrics::remove(tasks_launched);
       process::metrics::remove(abnormal_terminations);
     }
 
-    const LongLivedScheduler& scheduler;
-
-    process::Time start_time;
-    double _uptime_secs()
-    {
-      return (Clock::now() - start_time).secs();
-    }
-
-    bool isRegistered;
-    double _registered()
-    {
-      return isRegistered ? 1 : 0;
-    }
-
-    double _tasksLaunched()
-    {
-      return scheduler.tasksLaunched;
-    }
-
     process::metrics::Gauge uptime_secs;
-    process::metrics::Gauge registered;
+    process::metrics::Gauge subscribed;
 
     process::metrics::Counter offers_received;
-    process::metrics::Gauge tasks_launched;
+    process::metrics::Counter tasks_launched;
 
     // The only expected terminal state is TASK_FINISHED.
     // Other terminal states are considered incorrect.


[3/5] mesos git commit: Cleaned up the virtual overloads in long lived framework.

Posted by vi...@apache.org.
Cleaned up the virtual overloads in long lived framework.

This change removes the unused methods that won't be needed
after moving to the v1 API. This review mainly eases reviewing
later in the chain and should not be committed on its own.

Review: https://reviews.apache.org/r/45799/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a5b0cbee
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a5b0cbee
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a5b0cbee

Branch: refs/heads/master
Commit: a5b0cbee860dfdb5bc5b5355d11b66d0f765357a
Parents: c82c12a
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Apr 12 12:23:17 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:17 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 33 ------------------------------
 1 file changed, 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a5b0cbee/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 035ddda..7d8a5c3 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -93,29 +93,6 @@ public:
     process::wait(metrics);
   }
 
-  virtual void registered(SchedulerDriver*,
-                          const FrameworkID&,
-                          const MasterInfo&)
-  {
-    LOG(INFO) << "Registered!";
-
-    metrics.isRegistered = true;
-  }
-
-  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
-  {
-    LOG(INFO) << "Re-registered!";
-
-    metrics.isRegistered = true;
-  }
-
-  virtual void disconnected(SchedulerDriver* driver)
-  {
-    LOG(INFO) << "Disconnected!";
-
-    metrics.isRegistered = false;
-  }
-
   virtual void resourceOffers(SchedulerDriver* driver,
                               const vector<Offer>& offers)
   {
@@ -160,9 +137,6 @@ public:
     }
   }
 
-  virtual void offerRescinded(SchedulerDriver* driver,
-                              const OfferID& offerId) {}
-
   virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
   {
     LOG(INFO)
@@ -178,11 +152,6 @@ public:
     }
   }
 
-  virtual void frameworkMessage(SchedulerDriver* driver,
-                                const ExecutorID& executorId,
-                                const SlaveID& slaveId,
-                                const string& data) {}
-
   virtual void slaveLost(SchedulerDriver* driver, const SlaveID& _slaveId)
   {
     LOG(INFO) << "Slave lost: " << _slaveId;
@@ -204,8 +173,6 @@ public:
     slaveId = None();
   }
 
-  virtual void error(SchedulerDriver* driver, const string& message) {}
-
 private:
   // Helper to decline an offer.
   void declineOffer(SchedulerDriver* driver, const Offer& offer)