You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/10 18:28:55 UTC
[3/5] mesos git commit: Made the command executor use the unversioned
protobufs internally.
Made the command executor use the unversioned protobufs internally.
Currently, the existing command executor was in the `v1` namespace
and used the v1 protobufs. This change addresses that and moves the
code to the `mesos::internal` namespace.
Review: https://reviews.apache.org/r/50411/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/64842e4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/64842e4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/64842e4c
Branch: refs/heads/master
Commit: 64842e4cdc24900f586f87bb645b6d6d88da4804
Parents: 7070d1e
Author: Anand Mazumdar <an...@apache.org>
Authored: Wed Aug 10 11:04:02 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Wed Aug 10 11:04:02 2016 -0700
----------------------------------------------------------------------
src/launcher/executor.cpp | 180 +++++++++++++++++------------------
src/launcher/posix/executor.cpp | 9 +-
src/launcher/posix/executor.hpp | 6 +-
3 files changed, 91 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 52d20af..0e23d4f 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -28,9 +28,6 @@
#include <mesos/mesos.hpp>
-#include <mesos/v1/executor.hpp>
-#include <mesos/v1/mesos.hpp>
-
#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
@@ -110,25 +107,14 @@ using process::Subprocess;
using process::Time;
using process::Timer;
-using mesos::internal::devolve;
-using mesos::internal::evolve;
-using mesos::internal::HealthChecker;
-using mesos::internal::TaskHealthStatus;
-
-using mesos::internal::protobuf::frameworkHasCapability;
+using mesos::executor::Call;
+using mesos::executor::Event;
-using mesos::v1::ExecutorID;
-using mesos::v1::FrameworkID;
-
-using mesos::v1::executor::Call;
-using mesos::v1::executor::Event;
using mesos::v1::executor::Mesos;
using mesos::v1::executor::MesosBase;
using mesos::v1::executor::V0ToV1Adapter;
-
namespace mesos {
-namespace v1 {
namespace internal {
class CommandExecutor: public ProtobufProcess<CommandExecutor>
@@ -190,65 +176,60 @@ public:
state = DISCONNECTED;
}
- void received(queue<Event> events)
+ void received(const Event& event)
{
- while (!events.empty()) {
- Event event = events.front();
- events.pop();
-
- cout << "Received " << event.type() << " event" << endl;
-
- switch (event.type()) {
- case Event::SUBSCRIBED: {
- cout << "Subscribed executor on "
- << event.subscribed().agent_info().hostname() << endl;
-
- frameworkInfo = event.subscribed().framework_info();
- state = SUBSCRIBED;
- break;
- }
-
- case Event::LAUNCH: {
- launch(event.launch().task());
- break;
- }
-
- case Event::KILL: {
- Option<KillPolicy> override = event.kill().has_kill_policy()
- ? Option<KillPolicy>(event.kill().kill_policy())
- : None();
-
- kill(event.kill().task_id(), override);
- break;
- }
-
- case Event::ACKNOWLEDGED: {
- // Remove the corresponding update.
- updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get());
-
- // Remove the corresponding task.
- task = None();
- break;
- }
-
- case Event::SHUTDOWN: {
- shutdown();
- break;
- }
-
- case Event::MESSAGE: {
- break;
- }
-
- case Event::ERROR: {
- cerr << "Error: " << event.error().message() << endl;
- break;
- }
-
- case Event::UNKNOWN: {
- LOG(WARNING) << "Received an UNKNOWN event and ignored";
- break;
- }
+ cout << "Received " << event.type() << " event" << endl;
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Subscribed executor on "
+ << event.subscribed().slave_info().hostname() << endl;
+
+ frameworkInfo = event.subscribed().framework_info();
+ state = SUBSCRIBED;
+ break;
+ }
+
+ case Event::LAUNCH: {
+ launch(event.launch().task());
+ break;
+ }
+
+ case Event::KILL: {
+ Option<KillPolicy> override = event.kill().has_kill_policy()
+ ? Option<KillPolicy>(event.kill().kill_policy())
+ : None();
+
+ kill(event.kill().task_id(), override);
+ break;
+ }
+
+ case Event::ACKNOWLEDGED: {
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get());
+
+ // Remove the corresponding task.
+ task = None();
+ break;
+ }
+
+ case Event::SHUTDOWN: {
+ shutdown();
+ break;
+ }
+
+ case Event::MESSAGE: {
+ break;
+ }
+
+ case Event::ERROR: {
+ cerr << "Error: " << event.error().message() << endl;
+ break;
+ }
+
+ case Event::UNKNOWN: {
+ LOG(WARNING) << "Received an UNKNOWN event and ignored";
+ break;
}
}
}
@@ -271,31 +252,45 @@ protected:
// after the process has spawned.
if (value.isSome() && value.get() == "1") {
mesos.reset(new Mesos(
- mesos::ContentType::PROTOBUF,
+ ContentType::PROTOBUF,
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
- defer(self(), &Self::received, lambda::_1)));
+ defer(self(), [this](queue<v1::executor::Event> events) {
+ while(!events.empty()) {
+ const v1::executor::Event& event = events.front();
+ received(devolve(event));
+
+ events.pop();
+ }
+ })));
} else {
mesos.reset(new V0ToV1Adapter(
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
- defer(self(), &Self::received, lambda::_1)));
+ defer(self(), [this](queue<v1::executor::Event> events) {
+ while(!events.empty()) {
+ const v1::executor::Event& event = events.front();
+ received(devolve(event));
+
+ events.pop();
+ }
+ })));
}
}
void taskHealthUpdated(
- const mesos::TaskID& taskID,
+ const TaskID& taskID,
const bool healthy,
const bool initiateTaskKill)
{
cout << "Received task health update, healthy: "
<< stringify(healthy) << endl;
- update(evolve(taskID), TASK_RUNNING, healthy);
+ update(taskID, TASK_RUNNING, healthy);
if (initiateTaskKill) {
killedByHealthCheck = true;
- kill(evolve(taskID));
+ kill(taskID);
}
}
@@ -323,7 +318,7 @@ protected:
subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
}
- mesos->send(call);
+ mesos->send(evolve(call));
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
@@ -362,7 +357,7 @@ protected:
ABORT("Failed to parse JSON: " + object.error());
}
- Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+ Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(object.get());
if (parse.isError()) {
ABORT("Failed to parse protobuf: " + parse.error());
}
@@ -418,9 +413,9 @@ protected:
if (task->has_health_check()) {
Try<Owned<HealthChecker>> _checker = HealthChecker::create(
- devolve(task->health_check()),
+ task->health_check(),
self(),
- devolve(task->task_id()));
+ task->task_id());
if (_checker.isError()) {
// TODO(gilbert): Consider ABORT and return a TASK_FAILED here.
@@ -559,9 +554,9 @@ private:
CHECK_SOME(taskId);
CHECK(taskId.get() == _taskId);
- if (frameworkHasCapability(
- devolve(frameworkInfo.get()),
- mesos::FrameworkInfo::Capability::TASK_KILLING_STATE)) {
+ if (protobuf::frameworkHasCapability(
+ frameworkInfo.get(),
+ FrameworkInfo::Capability::TASK_KILLING_STATE)) {
update(taskId.get(), TASK_KILLING);
}
@@ -714,7 +709,7 @@ private:
// Capture the status update.
updates[uuid] = call.update();
- mesos->send(call);
+ mesos->send(evolve(call));
}
enum State
@@ -757,7 +752,6 @@ private:
};
} // namespace internal {
-} // namespace v1 {
} // namespace mesos {
@@ -811,8 +805,8 @@ public:
int main(int argc, char** argv)
{
Flags flags;
- FrameworkID frameworkId;
- ExecutorID executorId;
+ mesos::FrameworkID frameworkId;
+ mesos::ExecutorID executorId;
#ifdef __WINDOWS__
process::Winsock winsock;
@@ -869,8 +863,8 @@ int main(int argc, char** argv)
shutdownGracePeriod = parse.get();
}
- Owned<mesos::v1::internal::CommandExecutor> executor(
- new mesos::v1::internal::CommandExecutor(
+ Owned<mesos::internal::CommandExecutor> executor(
+ new mesos::internal::CommandExecutor(
flags.launcher_dir,
flags.rootfs,
flags.sandbox_directory,
http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/posix/executor.cpp b/src/launcher/posix/executor.cpp
index 6814b9f..43573ca 100644
--- a/src/launcher/posix/executor.cpp
+++ b/src/launcher/posix/executor.cpp
@@ -24,8 +24,6 @@
#include <stout/os/raw/argv.hpp>
-#include "internal/devolve.hpp"
-
#include "launcher/posix/executor.hpp"
#ifdef __linux__
@@ -48,16 +46,14 @@ using std::endl;
using std::string;
using std::vector;
-using mesos::internal::devolve;
using mesos::internal::slave::MESOS_CONTAINERIZER;
using mesos::internal::slave::MesosContainerizerLaunch;
namespace mesos {
-namespace v1 {
namespace internal {
pid_t launchTaskPosix(
- const mesos::v1::CommandInfo& command,
+ const CommandInfo& command,
const string& launcherDir,
const Option<string>& user,
const Option<string>& rootfs,
@@ -85,7 +81,7 @@ pid_t launchTaskPosix(
// Prepare the flags to pass to the launch process.
MesosContainerizerLaunch::Flags launchFlags;
- launchFlags.command = JSON::protobuf(devolve(command));
+ launchFlags.command = JSON::protobuf(command);
if (rootfs.isSome()) {
CHECK_SOME(sandboxDirectory);
@@ -127,5 +123,4 @@ pid_t launchTaskPosix(
}
} // namespace internal {
-} // namespace v1 {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/posix/executor.hpp b/src/launcher/posix/executor.hpp
index a508089..9e46726 100644
--- a/src/launcher/posix/executor.hpp
+++ b/src/launcher/posix/executor.hpp
@@ -19,16 +19,15 @@
#include <string>
-#include <mesos/v1/mesos.hpp>
+#include <mesos/mesos.hpp>
#include <stout/option.hpp>
namespace mesos {
-namespace v1 {
namespace internal {
pid_t launchTaskPosix(
- const mesos::v1::CommandInfo& command,
+ const CommandInfo& command,
const std::string& launcherDir,
const Option<std::string>& user,
const Option<std::string>& rootfs,
@@ -36,7 +35,6 @@ pid_t launchTaskPosix(
const Option<std::string>& workingDirectory);
} // namespace internal {
-} // namespace v1 {
} // namespace mesos {
#endif // __LAUNCHER_POSIX_EXECUTOR_HPP__