You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/30 01:19:42 UTC
[6/6] mesos git commit: Moved command scheduler to use the scheduler
library.
Moved command scheduler to use the scheduler library.
This change moves the command scheduler (`mesos-execute`) to use
the scheduler library instead of the driver.
Review: https://reviews.apache.org/r/45008/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/250af60c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/250af60c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/250af60c
Branch: refs/heads/master
Commit: 250af60cf192146f9a942bce14b50e5107c81f6a
Parents: 85a6d0c
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 29 16:19:07 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 29 16:19:07 2016 -0700
----------------------------------------------------------------------
src/cli/execute.cpp | 268 ++++++++++++++++++++++++++++++++++++++---------
1 file changed, 221 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/250af60c/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index b28843c..af62f41 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -15,15 +15,20 @@
// limitations under the License.
#include <iostream>
+#include <queue>
#include <vector>
-#include <mesos/resources.hpp>
-#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <process/protobuf.hpp>
#include <stout/check.hpp>
#include <stout/flags.hpp>
@@ -32,21 +37,39 @@
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
+#include <stout/unreachable.hpp>
#include "common/parse.hpp"
#include "common/protobuf_utils.hpp"
#include "hdfs/hdfs.hpp"
-using namespace mesos;
-using namespace mesos::internal;
+#include "internal/devolve.hpp"
using std::cerr;
using std::cout;
using std::endl;
+using std::queue;
using std::string;
using std::vector;
+using mesos::internal::devolve;
+
+using mesos::v1::CommandInfo;
+using mesos::v1::ContainerInfo;
+using mesos::v1::Environment;
+using mesos::v1::FrameworkID;
+using mesos::v1::FrameworkInfo;
+using mesos::v1::Image;
+using mesos::v1::Offer;
+using mesos::v1::Resources;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
using process::Future;
using process::Owned;
using process::UPID;
@@ -141,19 +164,24 @@ public:
};
-class CommandScheduler : public Scheduler
+class CommandScheduler : public process::Process<CommandScheduler>
{
public:
CommandScheduler(
+ const FrameworkInfo& _frameworkInfo,
+ const string& _master,
const string& _name,
- const bool& _shell,
+ const bool _shell,
const Option<string>& _command,
const Option<hashmap<string, string>>& _environment,
const string& _resources,
const Option<string>& _uri,
const Option<string>& _dockerImage,
const string& _containerizer)
- : name(_name),
+ : state(DISCONNECTED),
+ frameworkInfo(_frameworkInfo),
+ master(_master),
+ name(_name),
shell(_shell),
command(_command),
environment(_environment),
@@ -165,16 +193,63 @@ public:
virtual ~CommandScheduler() {}
- virtual void resourceOffers(
- SchedulerDriver* driver,
- const vector<Offer>& offers)
+protected:
+ virtual void initialize()
+ {
+ // We initialize the library here to ensure that callbacks are only invoked
+ // after the process has spawned.
+ mesos.reset(new Mesos(
+ master,
+ mesos::ContentType::PROTOBUF,
+ process::defer(self(), &Self::connected),
+ process::defer(self(), &Self::disconnected),
+ process::defer(self(), &Self::received, lambda::_1)));
+ }
+
+ void connected()
+ {
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void doReliableRegistration()
{
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ if (frameworkInfo.has_id()) {
+ call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+ }
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ mesos->send(call);
+
+ process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void offers(const vector<Offer>& offers)
+ {
+ CHECK_EQ(SUBSCRIBED, state);
+
static const Try<Resources> TASK_RESOURCES = Resources::parse(resources);
if (TASK_RESOURCES.isError()) {
- cerr << "Failed to parse resources '" << resources
- << "': " << TASK_RESOURCES.error() << endl;
- driver->abort();
+ EXIT(EXIT_FAILURE)
+ << "Failed to parse resources '" << resources << "': "
+ << TASK_RESOURCES.error();
+
return;
}
@@ -184,7 +259,7 @@ public:
TaskInfo task;
task.set_name(name);
task.mutable_task_id()->set_value(name);
- task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_agent_id()->MergeFrom(offer.agent_id());
task.mutable_resources()->CopyFrom(TASK_RESOURCES.get());
CommandInfo* commandInfo = task.mutable_command();
@@ -203,8 +278,9 @@ public:
Environment* environment_ = commandInfo->mutable_environment();
foreachpair (
const string& name, const string& value, environment.get()) {
- Environment_Variable* environmentVariable =
+ Environment::Variable* environmentVariable =
environment_->add_variables();
+
environmentVariable->set_name(name);
environmentVariable->set_value(value);
}
@@ -236,9 +312,8 @@ public:
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
} else {
- cerr << "Unsupported containerizer: " << containerizer << endl;;
-
- driver->abort();
+ EXIT(EXIT_FAILURE)
+ << "Unsupported containerizer: " << containerizer;
return;
}
@@ -246,33 +321,127 @@ public:
task.mutable_container()->CopyFrom(containerInfo);
}
- vector<TaskInfo> tasks;
- tasks.push_back(task);
+ Call call;
+ call.set_type(Call::ACCEPT);
+
+ CHECK(frameworkInfo.has_id());
+ call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
- driver->launchTasks(offer.id(), tasks);
- cout << "task " << name << " submitted to slave "
- << offer.slave_id() << endl;
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+
+ operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+
+ mesos->send(call);
+
+ cout << "task " << name << " submitted to agent "
+ << offer.agent_id() << endl;
launched = true;
} else {
- driver->declineOffer(offer.id());
+ Call call;
+ call.set_type(Call::DECLINE);
+
+ CHECK(frameworkInfo.has_id());
+ call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+ Call::Decline* decline = call.mutable_decline();
+ decline->add_offer_ids()->CopyFrom(offer.id());
+
+ mesos->send(call);
+ }
+ }
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ frameworkInfo.mutable_id()->
+ CopyFrom(event.subscribed().framework_id());
+
+ state = SUBSCRIBED;
+
+ cout << "Subscribed with ID '" << frameworkInfo.id() << endl;
+ break;
+ }
+
+ case Event::OFFERS: {
+ offers(google::protobuf::convert(event.offers().offers()));
+ break;
+ }
+
+ case Event::UPDATE: {
+ update(event.update().status());
+ break;
+ }
+
+ case Event::ERROR: {
+ EXIT(EXIT_FAILURE)
+ << "Received an ERROR event: " << event.error().message();
+
+ break;
+ }
+
+ case Event::HEARTBEAT:
+ case Event::FAILURE:
+ case Event::RESCIND:
+ case Event::MESSAGE: {
+ break;
+ }
+
+ default: {
+ UNREACHABLE();
+ }
}
}
}
- virtual void statusUpdate(
- SchedulerDriver* driver,
- const TaskStatus& status)
+ void update(const TaskStatus& status)
{
+ CHECK_EQ(SUBSCRIBED, state);
CHECK_EQ(name, status.task_id().value());
+
cout << "Received status update " << status.state()
<< " for task " << status.task_id() << endl;
- if (mesos::internal::protobuf::isTerminalState(status.state())) {
- driver->stop();
+
+ if (status.has_uuid()) {
+ Call call;
+ call.set_type(Call::ACKNOWLEDGE);
+
+ CHECK(frameworkInfo.has_id());
+ call.mutable_framework_id()->CopyFrom(frameworkInfo.id());
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+ acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+ acknowledge->set_uuid(status.uuid());
+
+ mesos->send(call);
+ }
+
+ if (mesos::internal::protobuf::isTerminalState(devolve(status).state())) {
+ terminate(self());
}
}
private:
+ enum State
+ {
+ DISCONNECTED,
+ CONNECTED,
+ SUBSCRIBED
+ } state;
+
+ FrameworkInfo frameworkInfo;
+ const string master;
const string name;
bool shell;
const Option<string> command;
@@ -282,6 +451,7 @@ private:
const Option<string> dockerImage;
const string containerizer;
bool launched;
+ Owned<Mesos> mesos;
};
@@ -406,22 +576,26 @@ int main(int argc, char** argv)
dockerImage = flags.docker_image.get();
}
- CommandScheduler scheduler(
- flags.name.get(),
- flags.shell,
- flags.command,
- environment,
- flags.resources,
- uri,
- dockerImage,
- flags.containerizer);
-
- FrameworkInfo framework;
- framework.set_user(user.get());
- framework.set_name("");
- framework.set_checkpoint(flags.checkpoint);
-
- MesosSchedulerDriver driver(&scheduler, framework, flags.master.get());
-
- return driver.run() == DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.set_user(user.get());
+ frameworkInfo.set_name("");
+ frameworkInfo.set_checkpoint(flags.checkpoint);
+
+ Owned<CommandScheduler> scheduler(
+ new CommandScheduler(
+ frameworkInfo,
+ flags.master.get(),
+ flags.name.get(),
+ flags.shell,
+ flags.command,
+ environment,
+ flags.resources,
+ uri,
+ dockerImage,
+ flags.containerizer));
+
+ process::spawn(scheduler.get());
+ process::wait(scheduler.get());
+
+ return EXIT_SUCCESS;
}