You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/12 21:23:37 UTC

[4/5] mesos git commit: Moved long running framework to use the v1 API.

Moved long running framework to use the v1 API.

See summary. Also removed the AuthN code. Once, the library
supports AuthN we can bring it back.

Review: https://reviews.apache.org/r/45800/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f3aae0f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f3aae0f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f3aae0f0

Branch: refs/heads/master
Commit: f3aae0f04aacaaed1e014a010ed3c22cf5a3c075
Parents: a5b0cbe
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Apr 12 12:23:20 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Apr 12 12:23:20 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_framework.cpp | 343 +++++++++++++++++++++--------
 1 file changed, 255 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f3aae0f0/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 7d8a5c3..f5700bd 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -17,15 +17,18 @@
 #include <glog/logging.h>
 
 #include <iostream>
+#include <queue>
 #include <string>
 
-#include <mesos/resources.hpp>
-#include <mesos/scheduler.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
 
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/help.hpp>
 #include <process/http.hpp>
+#include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/time.hpp>
@@ -41,16 +44,33 @@
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
 
-using namespace mesos;
-
+using std::queue;
 using std::string;
 using std::vector;
 
+using mesos::v1::AgentID;
+using mesos::v1::CommandInfo;
+using mesos::v1::ExecutorID;
+using mesos::v1::ExecutorInfo;
+using mesos::v1::Filters;
+using mesos::v1::FrameworkID;
+using mesos::v1::FrameworkInfo;
+using mesos::v1::Offer;
+using mesos::v1::Resources;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
 using process::AUTHENTICATION;
 using process::Clock;
 using process::defer;
 using process::DESCRIPTION;
 using process::HELP;
+using process::Owned;
 using process::TLDR;
 
 using process::http::OK;
@@ -70,14 +90,20 @@ const double CPUS_PER_EXECUTOR = 0.1;
 const int32_t MEM_PER_EXECUTOR = 32;
 
 
-// This scheduler picks one slave and repeatedly launches sleep tasks on it,
-// using a single multi-task executor. If the slave or executor fails, the
-// scheduler will pick another slave and continue launching sleep tasks.
-class LongLivedScheduler : public Scheduler
+// This scheduler picks one agent and repeatedly launches sleep tasks on it,
+// using a single multi-task executor. If the agent or executor fails, the
+// scheduler will pick another agent and continue launching sleep tasks.
+class LongLivedScheduler : public process::Process<LongLivedScheduler>
 {
 public:
-  explicit LongLivedScheduler(const ExecutorInfo& _executor)
-    : executor(_executor),
+  LongLivedScheduler(
+      const string& _master,
+      const FrameworkInfo& _framework,
+      const ExecutorInfo& _executor)
+    : state(DISCONNECTED),
+      master(_master),
+      framework(_framework),
+      executor(_executor),
       taskResources(Resources::parse(
           "cpus:" + stringify(CPUS_PER_TASK) +
           ";mem:" + stringify(MEM_PER_TASK)).get()),
@@ -93,15 +119,131 @@ public:
     process::wait(metrics);
   }
 
-  virtual void resourceOffers(SchedulerDriver* driver,
-                              const vector<Offer>& offers)
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+      master,
+      mesos::ContentType::PROTOBUF,
+      process::defer(self(), &Self::connected),
+      process::defer(self(), &Self::disconnected),
+      process::defer(self(), &Self::received, lambda::_1)));
+  }
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    LOG(INFO) << "Disconnected!";
+
+    state = DISCONNECTED;
+
+    metrics.isRegistered = false;
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      LOG(INFO) << "Received " << event.type() << " event";
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          LOG(INFO) << "Subscribed with ID '" << framework.id();
+
+          framework.mutable_id()->
+            CopyFrom(event.subscribed().framework_id());
+
+          state = SUBSCRIBED;
+
+          metrics.isRegistered = true;
+          break;
+        }
+
+        case Event::OFFERS: {
+          offers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::UPDATE: {
+          update(event.update().status());
+          break;
+        }
+
+        case Event::FAILURE: {
+          const Event::Failure& failure = event.failure();
+
+          if (failure.has_agent_id() && failure.has_executor_id()) {
+            executorFailed(
+                failure.executor_id(),
+                failure.agent_id(),
+                failure.has_status() ? Option<int>(failure.status()) : None());
+          } else {
+            CHECK(failure.has_agent_id());
+
+            agentFailed(failure.agent_id());
+          }
+          break;
+        }
+
+        case Event::ERROR: {
+          LOG(ERROR) << "Error: " << event.error().message();
+          break;
+        }
+
+        case Event::HEARTBEAT:
+        case Event::RESCIND:
+        case Event::MESSAGE: {
+          break;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
+      }
+    }
+  }
+
+  void offers(const vector<Offer>& offers)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     static const Resources EXECUTOR_RESOURCES = Resources(executor.resources());
 
     metrics.offers_received += offers.size();
 
     foreach (const Offer& offer, offers) {
-      if (slaveId.isNone()) {
+      if (agentId.isNone()) {
         // No active executor running in the cluster.
         // Launch a new task with executor.
 
@@ -111,102 +253,158 @@ public:
             << "Starting executor and task " << tasksLaunched
             << " on " << offer.hostname();
 
-          launchTask(driver, offer);
+          launch(offer);
 
-          slaveId = offer.slave_id();
+          agentId = offer.agent_id();
         } else {
-          declineOffer(driver, offer);
+          decline(offer);
         }
-      } else if (slaveId == offer.slave_id()) {
-        // Offer from the same slave that has an active executor.
+      } else if (agentId == offer.agent_id()) {
+        // Offer from the same agent that has an active executor.
         // Launch more tasks on that executor.
 
         if (Resources(offer.resources()).flatten().contains(taskResources)) {
           LOG(INFO)
             << "Starting task " << tasksLaunched << " on " << offer.hostname();
 
-          launchTask(driver, offer);
+          launch(offer);
         } else {
-          declineOffer(driver, offer);
+          decline(offer);
         }
       } else {
         // We have an active executor but this offer comes from a
-        // different slave; decline the offer.
-        declineOffer(driver, offer);
+        // different agent; decline the offer.
+        decline(offer);
       }
     }
   }
 
-  virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+  void update(const TaskStatus& status)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     LOG(INFO)
       << "Task " << status.task_id().value()
       << " is in state " << TaskState_Name(status.state())
       << (status.has_message() ? " with message: " + status.message() : "");
 
-    if (status.state() == TASK_KILLED ||
-        status.state() == TASK_LOST ||
-        status.state() == TASK_FAILED ||
-        status.state() == TASK_ERROR) {
+    if (status.has_uuid()) {
+      Call call;
+      call.set_type(Call::ACKNOWLEDGE);
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      mesos->send(call);
+    }
+
+    if (status.state() == TaskState::TASK_KILLED ||
+        status.state() == TaskState::TASK_LOST ||
+        status.state() == TaskState::TASK_FAILED ||
+        status.state() == TaskState::TASK_ERROR) {
       ++metrics.abnormal_terminations;
     }
   }
 
-  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& _slaveId)
+  void agentFailed(const AgentID& _agentId)
   {
-    LOG(INFO) << "Slave lost: " << _slaveId;
+    CHECK_EQ(SUBSCRIBED, state);
+
+    LOG(INFO) << "Agent lost: " << _agentId;
 
-    if (slaveId == _slaveId) {
-      slaveId = None();
+    if (agentId == _agentId) {
+      agentId = None();
     }
   }
 
-  virtual void executorLost(SchedulerDriver* driver,
-                            const ExecutorID& executorId,
-                            const SlaveID& _slaveId,
-                            int status)
+  void executorFailed(
+      const ExecutorID& executorId,
+      const AgentID& _agentId,
+      const Option<int>& status)
   {
+    CHECK_EQ(SUBSCRIBED, state);
+
     LOG(INFO)
-      << "Executor '" << executorId << "' lost on slave "
-      << _slaveId << " with status: " << status;
+      << "Executor '" << executorId << "' lost on agent '" << _agentId
+      << (status.isSome() ? "' with status: " + stringify(status.get()) : "");
 
-    slaveId = None();
+    agentId = None();
   }
 
 private:
   // Helper to decline an offer.
-  void declineOffer(SchedulerDriver* driver, const Offer& offer)
+  void decline(const Offer& offer)
   {
     Filters filters;
     filters.set_refuse_seconds(600);
 
-    driver->declineOffer(offer.id(), filters);
+    Call call;
+    call.set_type(Call::DECLINE);
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(offer.id());
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos->send(call);
   }
 
   // Helper to launch a task using an offer.
-  void launchTask(SchedulerDriver* driver, const Offer& offer)
+  void launch(const Offer& offer)
   {
     int taskId = tasksLaunched++;
 
     TaskInfo task;
     task.set_name("Task " + stringify(taskId));
     task.mutable_task_id()->set_value(stringify(taskId));
-    task.mutable_slave_id()->MergeFrom(offer.slave_id());
+    task.mutable_agent_id()->MergeFrom(offer.agent_id());
     task.mutable_resources()->CopyFrom(taskResources);
     task.mutable_executor()->CopyFrom(executor);
 
-    driver->launchTasks(offer.id(), {task});
+    Call call;
+    call.set_type(Call::ACCEPT);
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+
+    operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+
+    mesos->send(call);
   }
 
+  enum State
+  {
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED
+  } state;
+
+  const string master;
+  FrameworkInfo framework;
   const ExecutorInfo executor;
   const Resources taskResources;
   string uri;
   int tasksLaunched;
 
-  // The slave that is running the long-lived-executor.
-  // Unless that slave/executor dies, this framework will not launch
-  // an executor on any other slave.
-  Option<SlaveID> slaveId;
+  // The agent that is running the long-lived-executor.
+  // Unless that agent/executor dies, this framework will not launch
+  // an executor on any other agent.
+  Option<AgentID> agentId;
+
+  Owned<Mesos> mesos;
 
   struct Metrics : process::Process<Metrics>
   {
@@ -386,57 +584,26 @@ int main(int argc, char** argv)
 
   // Copy `--executor_uri` into the command.
   if (flags.executor_uri.isSome()) {
-    mesos::CommandInfo::URI* uri = executor.mutable_command()->add_uris();
+    CommandInfo::URI* uri = executor.mutable_command()->add_uris();
     uri->set_value(flags.executor_uri.get());
     uri->set_executable(true);
   }
 
-  LongLivedScheduler scheduler(executor);
-
   FrameworkInfo framework;
   framework.set_user(os::user().get());
   framework.set_name("Long Lived Framework (C++)");
   framework.set_checkpoint(flags.checkpoint);
 
-  MesosSchedulerDriver* driver;
-
-  // TODO(josephw): Refactor these into a common set of flags.
-  if (os::getenv("MESOS_AUTHENTICATE").isSome()) {
-    LOG(INFO) << "Enabling authentication for the framework";
-
-    Option<string> value = os::getenv("DEFAULT_PRINCIPAL");
-    if (value.isNone()) {
-      EXIT(EXIT_FAILURE)
-        << "Expecting authentication principal in the environment";
-    }
-
-    Credential credential;
-    credential.set_principal(value.get());
-
-    framework.set_principal(value.get());
-
-    value = os::getenv("DEFAULT_SECRET");
-    if (value.isNone()) {
-      EXIT(EXIT_FAILURE)
-        << "Expecting authentication secret in the environment";
-    }
-
-    credential.set_secret(value.get());
-
-    driver = new MesosSchedulerDriver(
-        &scheduler, framework, flags.master.get(), credential);
-  } else {
-    framework.set_principal("long-lived-framework-cpp");
-
-    driver = new MesosSchedulerDriver(
-        &scheduler, framework, flags.master.get());
-  }
+  // TODO(anand): Add support for AuthN once MESOS-3923 is resolved.
 
-  int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
+  Owned<LongLivedScheduler> scheduler(
+      new LongLivedScheduler(
+        flags.master.get(),
+        framework,
+        executor));
 
-  // Ensure that the driver process terminates.
-  driver->stop();
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
 
-  delete driver;
-  return status;
+  return EXIT_SUCCESS;
 }