You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/30 01:19:42 UTC

[6/6] mesos git commit: Moved command scheduler to use the scheduler library.

Moved command scheduler to use the scheduler library.

This change moves the command scheduler (`mesos-execute`) to use
the scheduler library instead of the driver.

Review: https://reviews.apache.org/r/45008/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/250af60c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/250af60c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/250af60c

Branch: refs/heads/master
Commit: 250af60cf192146f9a942bce14b50e5107c81f6a
Parents: 85a6d0c
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 29 16:19:07 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 29 16:19:07 2016 -0700

----------------------------------------------------------------------
 src/cli/execute.cpp | 268 ++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 221 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/250af60c/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index b28843c..af62f41 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -15,15 +15,20 @@
 // limitations under the License.
 
 #include <iostream>
+#include <queue>
 #include <vector>
 
-#include <mesos/resources.hpp>
-#include <mesos/scheduler.hpp>
 #include <mesos/type_utils.hpp>
 
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <process/delay.hpp>
 #include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/protobuf.hpp>
 
 #include <stout/check.hpp>
 #include <stout/flags.hpp>
@@ -32,21 +37,39 @@
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/unreachable.hpp>
 
 #include "common/parse.hpp"
 #include "common/protobuf_utils.hpp"
 
 #include "hdfs/hdfs.hpp"
 
-using namespace mesos;
-using namespace mesos::internal;
+#include "internal/devolve.hpp"
 
 using std::cerr;
 using std::cout;
 using std::endl;
+using std::queue;
 using std::string;
 using std::vector;
 
+using mesos::internal::devolve;
+
+using mesos::v1::CommandInfo;
+using mesos::v1::ContainerInfo;
+using mesos::v1::Environment;
+using mesos::v1::FrameworkID;
+using mesos::v1::FrameworkInfo;
+using mesos::v1::Image;
+using mesos::v1::Offer;
+using mesos::v1::Resources;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
 using process::Future;
 using process::Owned;
 using process::UPID;
@@ -141,19 +164,24 @@ public:
 };
 
 
-class CommandScheduler : public Scheduler
+class CommandScheduler : public process::Process<CommandScheduler>
 {
 public:
   CommandScheduler(
+      const FrameworkInfo& _frameworkInfo,
+      const string& _master,
       const string& _name,
-      const bool& _shell,
+      const bool _shell,
       const Option<string>& _command,
       const Option<hashmap<string, string>>& _environment,
       const string& _resources,
       const Option<string>& _uri,
       const Option<string>& _dockerImage,
       const string& _containerizer)
-    : name(_name),
+    : state(DISCONNECTED),
+      frameworkInfo(_frameworkInfo),
+      master(_master),
+      name(_name),
       shell(_shell),
       command(_command),
       environment(_environment),
@@ -165,16 +193,63 @@ public:
 
   virtual ~CommandScheduler() {}
 
-  virtual void resourceOffers(
-      SchedulerDriver* driver,
-      const vector<Offer>& offers)
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+      master,
+      mesos::ContentType::PROTOBUF,
+      process::defer(self(), &Self::connected),
+      process::defer(self(), &Self::disconnected),
+      process::defer(self(), &Self::received, lambda::_1)));
+  }
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void doReliableRegistration()
   {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    if (frameworkInfo.has_id()) {
+      call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+    }
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void offers(const vector<Offer>& offers)
+  {
+    CHECK_EQ(SUBSCRIBED, state);
+
     static const Try<Resources> TASK_RESOURCES = Resources::parse(resources);
 
     if (TASK_RESOURCES.isError()) {
-      cerr << "Failed to parse resources '" << resources
-           << "': " << TASK_RESOURCES.error() << endl;
-      driver->abort();
+      EXIT(EXIT_FAILURE)
+        << "Failed to parse resources '" << resources << "': "
+        << TASK_RESOURCES.error();
+
       return;
     }
 
@@ -184,7 +259,7 @@ public:
         TaskInfo task;
         task.set_name(name);
         task.mutable_task_id()->set_value(name);
-        task.mutable_slave_id()->MergeFrom(offer.slave_id());
+        task.mutable_agent_id()->MergeFrom(offer.agent_id());
         task.mutable_resources()->CopyFrom(TASK_RESOURCES.get());
 
         CommandInfo* commandInfo = task.mutable_command();
@@ -203,8 +278,9 @@ public:
           Environment* environment_ = commandInfo->mutable_environment();
           foreachpair (
               const string& name, const string& value, environment.get()) {
-            Environment_Variable* environmentVariable =
+            Environment::Variable* environmentVariable =
               environment_->add_variables();
+
             environmentVariable->set_name(name);
             environmentVariable->set_value(value);
           }
@@ -236,9 +312,8 @@ public:
 
             containerInfo.mutable_docker()->CopyFrom(dockerInfo);
           } else {
-            cerr << "Unsupported containerizer: " << containerizer << endl;;
-
-            driver->abort();
+            EXIT(EXIT_FAILURE)
+              << "Unsupported containerizer: " << containerizer;
 
             return;
           }
@@ -246,33 +321,127 @@ public:
           task.mutable_container()->CopyFrom(containerInfo);
         }
 
-        vector<TaskInfo> tasks;
-        tasks.push_back(task);
+        Call call;
+        call.set_type(Call::ACCEPT);
+
+        CHECK(frameworkInfo.has_id());
+        call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+        Call::Accept* accept = call.mutable_accept();
+        accept->add_offer_ids()->CopyFrom(offer.id());
 
-        driver->launchTasks(offer.id(), tasks);
-        cout << "task " << name << " submitted to slave "
-             << offer.slave_id() << endl;
+        Offer::Operation* operation = accept->add_operations();
+        operation->set_type(Offer::Operation::LAUNCH);
+
+        operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+
+        mesos->send(call);
+
+        cout << "task " << name << " submitted to agent "
+             << offer.agent_id() << endl;
 
         launched = true;
       } else {
-        driver->declineOffer(offer.id());
+        Call call;
+        call.set_type(Call::DECLINE);
+
+        CHECK(frameworkInfo.has_id());
+        call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+        Call::Decline* decline = call.mutable_decline();
+        decline->add_offer_ids()->CopyFrom(offer.id());
+
+        mesos->send(call);
+      }
+    }
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          frameworkInfo.mutable_id()->
+            CopyFrom(event.subscribed().framework_id());
+
+          state = SUBSCRIBED;
+
+          cout << "Subscribed with ID '" << frameworkInfo.id() << endl;
+          break;
+        }
+
+        case Event::OFFERS: {
+          offers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::UPDATE: {
+          update(event.update().status());
+          break;
+        }
+
+        case Event::ERROR: {
+          EXIT(EXIT_FAILURE)
+            << "Received an ERROR event: " << event.error().message();
+
+          break;
+        }
+
+        case Event::HEARTBEAT:
+        case Event::FAILURE:
+        case Event::RESCIND:
+        case Event::MESSAGE: {
+          break;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
       }
     }
   }
 
-  virtual void statusUpdate(
-      SchedulerDriver* driver,
-      const TaskStatus& status)
+  void update(const TaskStatus& status)
   {
+    CHECK_EQ(SUBSCRIBED, state);
     CHECK_EQ(name, status.task_id().value());
+
     cout << "Received status update " << status.state()
          << " for task " << status.task_id() << endl;
-    if (mesos::internal::protobuf::isTerminalState(status.state())) {
-      driver->stop();
+
+    if (status.has_uuid()) {
+      Call call;
+      call.set_type(Call::ACKNOWLEDGE);
+
+      CHECK(frameworkInfo.has_id());
+      call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      mesos->send(call);
+    }
+
+    if (mesos::internal::protobuf::isTerminalState(devolve(status).state())) {
+      terminate(self());
     }
   }
 
 private:
+  enum State
+  {
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED
+  } state;
+
+  FrameworkInfo frameworkInfo;
+  const string master;
   const string name;
   bool shell;
   const Option<string> command;
@@ -282,6 +451,7 @@ private:
   const Option<string> dockerImage;
   const string containerizer;
   bool launched;
+  Owned<Mesos> mesos;
 };
 
 
@@ -406,22 +576,26 @@ int main(int argc, char** argv)
     dockerImage = flags.docker_image.get();
   }
 
-  CommandScheduler scheduler(
-      flags.name.get(),
-      flags.shell,
-      flags.command,
-      environment,
-      flags.resources,
-      uri,
-      dockerImage,
-      flags.containerizer);
-
-  FrameworkInfo framework;
-  framework.set_user(user.get());
-  framework.set_name("");
-  framework.set_checkpoint(flags.checkpoint);
-
-  MesosSchedulerDriver driver(&scheduler, framework, flags.master.get());
-
-  return driver.run() == DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user(user.get());
+  frameworkInfo.set_name("");
+  frameworkInfo.set_checkpoint(flags.checkpoint);
+
+  Owned<CommandScheduler> scheduler(
+      new CommandScheduler(
+        frameworkInfo,
+        flags.master.get(),
+        flags.name.get(),
+        flags.shell,
+        flags.command,
+        environment,
+        flags.resources,
+        uri,
+        dockerImage,
+        flags.containerizer));
+
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
+
+  return EXIT_SUCCESS;
 }