You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/07 22:32:50 UTC

[1/4] mesos git commit: Fixed a memory leak in long lived executor.

Repository: mesos
Updated Branches:
  refs/heads/master c5bbd5af0 -> 7ca296cfc


Fixed a memory leak in long lived executor.

Instead of allocating the thread object on the heap,
modified code to make it a stack allocated object.

Review: https://reviews.apache.org/r/45795/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8d6c98b2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8d6c98b2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8d6c98b2

Branch: refs/heads/master
Commit: 8d6c98b2c9a442a4a7ee76ff404a4191e66ac004
Parents: c5bbd5a
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:24 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:24 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_executor.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8d6c98b2/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index f0ba896..5377865 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -70,11 +70,11 @@ public:
   {
     cout << "Starting task " << task.task_id().value() << endl;
 
-    std::thread* thread = new std::thread([=]() {
+    std::thread thread([=]() {
       run(driver, task);
     });
 
-    thread->detach();
+    thread.detach();
 
     TaskStatus status;
     status.mutable_task_id()->MergeFrom(task.task_id());


[2/4] mesos git commit: Deleted the `run` method in long lived executor.

Posted by vi...@apache.org.
Deleted the `run` method in long lived executor.

Review: https://reviews.apache.org/r/45796/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0b17319
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0b17319
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0b17319

Branch: refs/heads/master
Commit: a0b173193f5d508affc5f05513602a6ef98527d9
Parents: 8d6c98b
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:28 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:28 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_executor.cpp | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a0b17319/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index 5377865..efd6c65 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -33,18 +33,6 @@ using std::endl;
 using std::string;
 
 
-void run(ExecutorDriver* driver, const TaskInfo& task)
-{
-  os::sleep(Seconds(random() % 10));
-
-  TaskStatus status;
-  status.mutable_task_id()->MergeFrom(task.task_id());
-  status.set_state(TASK_FINISHED);
-
-  driver->sendStatusUpdate(status);
-}
-
-
 class LongLivedExecutor : public Executor
 {
 public:
@@ -71,7 +59,13 @@ public:
     cout << "Starting task " << task.task_id().value() << endl;
 
     std::thread thread([=]() {
-      run(driver, task);
+      os::sleep(Seconds(random() % 10));
+
+      TaskStatus status;
+      status.mutable_task_id()->MergeFrom(task.task_id());
+      status.set_state(TASK_FINISHED);
+
+      driver->sendStatusUpdate(status);
     });
 
     thread.detach();


[3/4] mesos git commit: Cleaned up the virtual overloads in long lived executor.

Posted by vi...@apache.org.
Cleaned up the virtual overloads in long lived executor.

This change removes the unused methods that won't be needed
after moving to the v1 API. It also renames `launchTask` to `launch`.
This review mainly eases reviewing later in the chain and should not
be committed on its own.

Review: https://reviews.apache.org/r/45797/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff3bf416
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff3bf416
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff3bf416

Branch: refs/heads/master
Commit: ff3bf416fcf7d7fa965d730cac9e5a35e812b4c2
Parents: a0b1731
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:32 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:32 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_executor.cpp | 25 +------------------------
 1 file changed, 1 insertion(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ff3bf416/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index efd6c65..53be40b 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -36,25 +36,7 @@ using std::string;
 class LongLivedExecutor : public Executor
 {
 public:
-  virtual ~LongLivedExecutor() {}
-
-  virtual void registered(ExecutorDriver* driver,
-                          const ExecutorInfo& executorInfo,
-                          const FrameworkInfo& frameworkInfo,
-                          const SlaveInfo& slaveInfo)
-  {
-    cout << "Registered executor on " << slaveInfo.hostname() << endl;
-  }
-
-  virtual void reregistered(ExecutorDriver* driver,
-                            const SlaveInfo& slaveInfo)
-  {
-    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
-  }
-
-  virtual void disconnected(ExecutorDriver* driver) {}
-
-  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  virtual void launch(ExecutorDriver* driver, const TaskInfo& task)
   {
     cout << "Starting task " << task.task_id().value() << endl;
 
@@ -76,11 +58,6 @@ public:
 
     driver->sendStatusUpdate(status);
   }
-
-  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) {}
-  virtual void frameworkMessage(ExecutorDriver* driver, const string& data) {}
-  virtual void shutdown(ExecutorDriver* driver) {}
-  virtual void error(ExecutorDriver* driver, const string& message) {}
 };
 
 


[4/4] mesos git commit: Move long lived executor to use the v1 API.

Posted by vi...@apache.org.
Move long lived executor to use the v1 API.

Review: https://reviews.apache.org/r/45798/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ca296cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ca296cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ca296cf

Branch: refs/heads/master
Commit: 7ca296cfc0fb88dfbed9f84494a4566c31ead90a
Parents: ff3bf41
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Apr 7 13:32:37 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Apr 7 13:32:37 2016 -0700

----------------------------------------------------------------------
 src/examples/long_lived_executor.cpp | 225 +++++++++++++++++++++++++++---
 1 file changed, 206 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca296cf/src/examples/long_lived_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_executor.cpp b/src/examples/long_lived_executor.cpp
index 53be40b..ebb9cbf 100644
--- a/src/examples/long_lived_executor.cpp
+++ b/src/examples/long_lived_executor.cpp
@@ -18,52 +18,239 @@
 
 #include <cstdlib>
 #include <iostream>
+#include <queue>
 #include <thread>
 
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
 
 #include <stout/duration.hpp>
-#include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/uuid.hpp>
 
-using namespace mesos;
-
+using std::cerr;
 using std::cout;
 using std::endl;
+using std::queue;
 using std::string;
 
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
 
-class LongLivedExecutor : public Executor
+class LongLivedExecutor : public process::Process<LongLivedExecutor>
 {
 public:
-  virtual void launch(ExecutorDriver* driver, const TaskInfo& task)
+  LongLivedExecutor(
+      const FrameworkID& _frameworkId,
+      const ExecutorID& _executorId)
+    : frameworkId(_frameworkId),
+      executorId(_executorId),
+      state(DISCONNECTED) {}
+
+  virtual ~LongLivedExecutor() = default;
+
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+        mesos::ContentType::PROTOBUF,
+        process::defer(self(), &Self::connected),
+        process::defer(self(), &Self::disconnected),
+        process::defer(self(), &Self::received, lambda::_1)));
+  }
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      cout << "Received " << event.type() << " event" << endl;
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << "Subscribed executor on "
+               << event.subscribed().agent_info().hostname() << endl;
+
+          state = SUBSCRIBED;
+          break;
+        }
+
+        case Event::LAUNCH: {
+          launch(event.launch().task());
+          break;
+        }
+
+        case Event::ACKNOWLEDGED: {
+          // Remove the corresponding update.
+          updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+          // Remove the corresponding task.
+          tasks.erase(event.acknowledged().task_id());
+          break;
+        }
+
+        case Event::ERROR: {
+          cerr << "Error: " << event.error().message() << endl;
+          break;
+        }
+
+        case Event::KILL:
+        case Event::MESSAGE:
+        case Event::SHUTDOWN: {
+          break;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
+      }
+    }
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    // Send all unacknowledged updates.
+    foreach (const Call::Update& update, updates.values()) {
+      subscribe->add_unacknowledged_updates()->MergeFrom(update);
+    }
+
+    // Send all unacknowledged tasks.
+    foreach (const TaskInfo& task, tasks.values()) {
+      subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+    }
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void update(const TaskInfo& task, const TaskState& state)
+  {
+    UUID uuid = UUID::random();
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.mutable_executor_id()->CopyFrom(executorId);
+    status.set_state(state);
+    status.set_source(TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(uuid.toBytes());
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::UPDATE);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    // Capture the status update.
+    updates[uuid] = call.update();
+
+    mesos->send(call);
+  }
+
+  void launch(const TaskInfo& task)
   {
     cout << "Starting task " << task.task_id().value() << endl;
 
+    tasks[task.task_id()] = task;
+
     std::thread thread([=]() {
       os::sleep(Seconds(random() % 10));
 
-      TaskStatus status;
-      status.mutable_task_id()->MergeFrom(task.task_id());
-      status.set_state(TASK_FINISHED);
-
-      driver->sendStatusUpdate(status);
+      update(task, TaskState::TASK_FINISHED);
     });
 
     thread.detach();
 
-    TaskStatus status;
-    status.mutable_task_id()->MergeFrom(task.task_id());
-    status.set_state(TASK_RUNNING);
-
-    driver->sendStatusUpdate(status);
+    update(task, TaskState::TASK_RUNNING);
   }
+
+private:
+  const FrameworkID frameworkId;
+  const ExecutorID executorId;
+  process::Owned<Mesos> mesos;
+  enum State
+  {
+    CONNECTED,
+    DISCONNECTED,
+    SUBSCRIBED
+  } state;
+
+  LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+  LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
 };
 
 
 int main(int argc, char** argv)
 {
-  LongLivedExecutor executor;
-  MesosExecutorDriver driver(&executor);
-  return driver.run() == DRIVER_STOPPED ? 0 : 1;
+  FrameworkID frameworkId;
+  ExecutorID executorId;
+
+  Option<string> value;
+
+  value = os::getenv("MESOS_FRAMEWORK_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+  }
+  frameworkId.set_value(value.get());
+
+  value = os::getenv("MESOS_EXECUTOR_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+  }
+  executorId.set_value(value.get());
+
+  process::Owned<LongLivedExecutor> executor(
+      new LongLivedExecutor(frameworkId, executorId));
+
+  process::spawn(executor.get());
+  process::wait(executor.get());
+
+  return EXIT_SUCCESS;
 }