You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/07 22:32:50 UTC
[1/4] mesos git commit: Fixed a memory leak in long lived executor.
Repository: mesos
Updated Branches:
refs/heads/master c5bbd5af0 -> 7ca296cfc
Fixed a memory leak in long lived executor.
Instead of allocating the thread object on the heap,
modified code to make it a stack allocated object.
Review: https://reviews.apache.org/r/45795/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8d6c98b2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8d6c98b2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8d6c98b2
Branch: refs/heads/master
Commit: 8d6c98b2c9a442a4a7ee76ff404a4191e66ac004
Parents: c5bbd5a
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:24 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:24 2016 -0700
----------------------------------------------------------------------
src/examples/long_lived_executor.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8d6c98b2/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index f0ba896..5377865 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -70,11 +70,11 @@ public:
{
cout << "Starting task " << task.task_id().value() << endl;
- std::thread* thread = new std::thread([=]() {
+ std::thread thread([=]() {
run(driver, task);
});
- thread->detach();
+ thread.detach();
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
[2/4] mesos git commit: Deleted the `run` method in long lived
executor.
Posted by vi...@apache.org.
Deleted the `run` method in long lived executor.
Review: https://reviews.apache.org/r/45796/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0b17319
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0b17319
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0b17319
Branch: refs/heads/master
Commit: a0b173193f5d508affc5f05513602a6ef98527d9
Parents: 8d6c98b
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:28 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:28 2016 -0700
----------------------------------------------------------------------
src/examples/long_lived_executor.cpp | 20 +++++++-------------
1 file changed, 7 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0b17319/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index 5377865..efd6c65 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -33,18 +33,6 @@ using std::endl;
using std::string;
-void run(ExecutorDriver* driver, const TaskInfo& task)
-{
- os::sleep(Seconds(random() % 10));
-
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_FINISHED);
-
- driver->sendStatusUpdate(status);
-}
-
-
class LongLivedExecutor : public Executor
{
public:
@@ -71,7 +59,13 @@ public:
cout << "Starting task " << task.task_id().value() << endl;
std::thread thread([=]() {
- run(driver, task);
+ os::sleep(Seconds(random() % 10));
+
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(task.task_id());
+ status.set_state(TASK_FINISHED);
+
+ driver->sendStatusUpdate(status);
});
thread.detach();
[3/4] mesos git commit: Cleaned up the virtual overloads in long
lived executor.
Posted by vi...@apache.org.
Cleaned up the virtual overloads in long lived executor.
This change removes the unused methods that won't be needed
after moving to the v1 API. It also renames `launchTask` to `launch`.
This review mainly eases reviewing later in the chain and should not
be committed on its own.
Review: https://reviews.apache.org/r/45797/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff3bf416
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff3bf416
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff3bf416
Branch: refs/heads/master
Commit: ff3bf416fcf7d7fa965d730cac9e5a35e812b4c2
Parents: a0b1731
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:32 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:32 2016 -0700
----------------------------------------------------------------------
src/examples/long_lived_executor.cpp | 25 +------------------------
1 file changed, 1 insertion(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ff3bf416/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index efd6c65..53be40b 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -36,25 +36,7 @@ using std::string;
class LongLivedExecutor : public Executor
{
public:
- virtual ~LongLivedExecutor() {}
-
- virtual void registered(ExecutorDriver* driver,
- const ExecutorInfo& executorInfo,
- const FrameworkInfo& frameworkInfo,
- const SlaveInfo& slaveInfo)
- {
- cout << "Registered executor on " << slaveInfo.hostname() << endl;
- }
-
- virtual void reregistered(ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
- {
- cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
- }
-
- virtual void disconnected(ExecutorDriver* driver) {}
-
- virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ virtual void launch(ExecutorDriver* driver, const TaskInfo& task)
{
cout << "Starting task " << task.task_id().value() << endl;
@@ -76,11 +58,6 @@ public:
driver->sendStatusUpdate(status);
}
-
- virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) {}
- virtual void frameworkMessage(ExecutorDriver* driver, const string& data) {}
- virtual void shutdown(ExecutorDriver* driver) {}
- virtual void error(ExecutorDriver* driver, const string& message) {}
};
[4/4] mesos git commit: Move long lived executor to use the v1 API.
Posted by vi...@apache.org.
Move long lived executor to use the v1 API.
Review: https://reviews.apache.org/r/45798/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ca296cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ca296cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ca296cf
Branch: refs/heads/master
Commit: 7ca296cfc0fb88dfbed9f84494a4566c31ead90a
Parents: ff3bf41
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:37 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:37 2016 -0700
----------------------------------------------------------------------
src/examples/long_lived_executor.cpp | 225 +++++++++++++++++++++++++++---
1 file changed, 206 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca296cf/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index 53be40b..ebb9cbf 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -18,52 +18,239 @@
#include <cstdlib>
#include <iostream>
+#include <queue>
#include <thread>
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
#include <stout/duration.hpp>
-#include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
#include <stout/os.hpp>
+#include <stout/uuid.hpp>
-using namespace mesos;
-
+using std::cerr;
using std::cout;
using std::endl;
+using std::queue;
using std::string;
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
-class LongLivedExecutor : public Executor
+class LongLivedExecutor : public process::Process<LongLivedExecutor>
{
public:
- virtual void launch(ExecutorDriver* driver, const TaskInfo& task)
+ LongLivedExecutor(
+ const FrameworkID& _frameworkId,
+ const ExecutorID& _executorId)
+ : frameworkId(_frameworkId),
+ executorId(_executorId),
+ state(DISCONNECTED) {}
+
+ virtual ~LongLivedExecutor() = default;
+
+protected:
+ virtual void initialize()
+ {
+ // We initialize the library here to ensure that callbacks are only invoked
+ // after the process has spawned.
+ mesos.reset(new Mesos(
+ mesos::ContentType::PROTOBUF,
+ process::defer(self(), &Self::connected),
+ process::defer(self(), &Self::disconnected),
+ process::defer(self(), &Self::received, lambda::_1)));
+ }
+
+ void connected()
+ {
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ cout << "Received " << event.type() << " event" << endl;
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Subscribed executor on "
+ << event.subscribed().agent_info().hostname() << endl;
+
+ state = SUBSCRIBED;
+ break;
+ }
+
+ case Event::LAUNCH: {
+ launch(event.launch().task());
+ break;
+ }
+
+ case Event::ACKNOWLEDGED: {
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+ // Remove the corresponding task.
+ tasks.erase(event.acknowledged().task_id());
+ break;
+ }
+
+ case Event::ERROR: {
+ cerr << "Error: " << event.error().message() << endl;
+ break;
+ }
+
+ case Event::KILL:
+ case Event::MESSAGE:
+ case Event::SHUTDOWN: {
+ break;
+ }
+
+ default: {
+ UNREACHABLE();
+ }
+ }
+ }
+ }
+
+ void doReliableRegistration()
+ {
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ // Send all unacknowledged updates.
+ foreach (const Call::Update& update, updates.values()) {
+ subscribe->add_unacknowledged_updates()->MergeFrom(update);
+ }
+
+ // Send all unacknowledged tasks.
+ foreach (const TaskInfo& task, tasks.values()) {
+ subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+ }
+
+ mesos->send(call);
+
+ process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void update(const TaskInfo& task, const TaskState& state)
+ {
+ UUID uuid = UUID::random();
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.mutable_executor_id()->CopyFrom(executorId);
+ status.set_state(state);
+ status.set_source(TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(uuid.toBytes());
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::UPDATE);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ // Capture the status update.
+ updates[uuid] = call.update();
+
+ mesos->send(call);
+ }
+
+ void launch(const TaskInfo& task)
{
cout << "Starting task " << task.task_id().value() << endl;
+ tasks[task.task_id()] = task;
+
std::thread thread([=]() {
os::sleep(Seconds(random() % 10));
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_FINISHED);
-
- driver->sendStatusUpdate(status);
+ update(task, TaskState::TASK_FINISHED);
});
thread.detach();
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_RUNNING);
-
- driver->sendStatusUpdate(status);
+ update(task, TaskState::TASK_RUNNING);
}
+
+private:
+ const FrameworkID frameworkId;
+ const ExecutorID executorId;
+ process::Owned<Mesos> mesos;
+ enum State
+ {
+ CONNECTED,
+ DISCONNECTED,
+ SUBSCRIBED
+ } state;
+
+ LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+ LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
};
int main(int argc, char** argv)
{
- LongLivedExecutor executor;
- MesosExecutorDriver driver(&executor);
- return driver.run() == DRIVER_STOPPED ? 0 : 1;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+
+ Option<string> value;
+
+ value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+ }
+ frameworkId.set_value(value.get());
+
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+ }
+ executorId.set_value(value.get());
+
+ process::Owned<LongLivedExecutor> executor(
+ new LongLivedExecutor(frameworkId, executorId));
+
+ process::spawn(executor.get());
+ process::wait(executor.get());
+
+ return EXIT_SUCCESS;
}