You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/10 18:28:55 UTC

[3/5] mesos git commit: Made the command executor use the unversioned protobufs internally.

Made the command executor use the unversioned protobufs internally.

Currently, the existing command executor was in the `v1` namespace
and used the v1 protobufs. This change addresses that and moves the
code to the `mesos::internal` namespace.

Review: https://reviews.apache.org/r/50411/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/64842e4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/64842e4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/64842e4c

Branch: refs/heads/master
Commit: 64842e4cdc24900f586f87bb645b6d6d88da4804
Parents: 7070d1e
Author: Anand Mazumdar <an...@apache.org>
Authored: Wed Aug 10 11:04:02 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Wed Aug 10 11:04:02 2016 -0700

----------------------------------------------------------------------
 src/launcher/executor.cpp       | 180 +++++++++++++++++------------------
 src/launcher/posix/executor.cpp |   9 +-
 src/launcher/posix/executor.hpp |   6 +-
 3 files changed, 91 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 52d20af..0e23d4f 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -28,9 +28,6 @@
 
 #include <mesos/mesos.hpp>
 
-#include <mesos/v1/executor.hpp>
-#include <mesos/v1/mesos.hpp>
-
 #include <mesos/type_utils.hpp>
 
 #include <process/clock.hpp>
@@ -110,25 +107,14 @@ using process::Subprocess;
 using process::Time;
 using process::Timer;
 
-using mesos::internal::devolve;
-using mesos::internal::evolve;
-using mesos::internal::HealthChecker;
-using mesos::internal::TaskHealthStatus;
-
-using mesos::internal::protobuf::frameworkHasCapability;
+using mesos::executor::Call;
+using mesos::executor::Event;
 
-using mesos::v1::ExecutorID;
-using mesos::v1::FrameworkID;
-
-using mesos::v1::executor::Call;
-using mesos::v1::executor::Event;
 using mesos::v1::executor::Mesos;
 using mesos::v1::executor::MesosBase;
 using mesos::v1::executor::V0ToV1Adapter;
 
-
 namespace mesos {
-namespace v1 {
 namespace internal {
 
 class CommandExecutor: public ProtobufProcess<CommandExecutor>
@@ -190,65 +176,60 @@ public:
     state = DISCONNECTED;
   }
 
-  void received(queue<Event> events)
+  void received(const Event& event)
   {
-    while (!events.empty()) {
-      Event event = events.front();
-      events.pop();
-
-      cout << "Received " << event.type() << " event" << endl;
-
-      switch (event.type()) {
-        case Event::SUBSCRIBED: {
-          cout << "Subscribed executor on "
-               << event.subscribed().agent_info().hostname() << endl;
-
-          frameworkInfo = event.subscribed().framework_info();
-          state = SUBSCRIBED;
-          break;
-        }
-
-        case Event::LAUNCH: {
-          launch(event.launch().task());
-          break;
-        }
-
-        case Event::KILL: {
-          Option<KillPolicy> override = event.kill().has_kill_policy()
-            ? Option<KillPolicy>(event.kill().kill_policy())
-            : None();
-
-          kill(event.kill().task_id(), override);
-          break;
-        }
-
-        case Event::ACKNOWLEDGED: {
-          // Remove the corresponding update.
-          updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get());
-
-          // Remove the corresponding task.
-          task = None();
-          break;
-        }
-
-        case Event::SHUTDOWN: {
-          shutdown();
-          break;
-        }
-
-        case Event::MESSAGE: {
-          break;
-        }
-
-        case Event::ERROR: {
-          cerr << "Error: " << event.error().message() << endl;
-          break;
-        }
-
-        case Event::UNKNOWN: {
-          LOG(WARNING) << "Received an UNKNOWN event and ignored";
-          break;
-        }
+    cout << "Received " << event.type() << " event" << endl;
+
+    switch (event.type()) {
+      case Event::SUBSCRIBED: {
+        cout << "Subscribed executor on "
+             << event.subscribed().slave_info().hostname() << endl;
+
+        frameworkInfo = event.subscribed().framework_info();
+        state = SUBSCRIBED;
+        break;
+      }
+
+      case Event::LAUNCH: {
+        launch(event.launch().task());
+        break;
+      }
+
+      case Event::KILL: {
+        Option<KillPolicy> override = event.kill().has_kill_policy()
+          ? Option<KillPolicy>(event.kill().kill_policy())
+          : None();
+
+        kill(event.kill().task_id(), override);
+        break;
+      }
+
+      case Event::ACKNOWLEDGED: {
+        // Remove the corresponding update.
+        updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get());
+
+        // Remove the corresponding task.
+        task = None();
+        break;
+      }
+
+      case Event::SHUTDOWN: {
+        shutdown();
+        break;
+      }
+
+      case Event::MESSAGE: {
+        break;
+      }
+
+      case Event::ERROR: {
+        cerr << "Error: " << event.error().message() << endl;
+        break;
+      }
+
+      case Event::UNKNOWN: {
+        LOG(WARNING) << "Received an UNKNOWN event and ignored";
+        break;
       }
     }
   }
@@ -271,31 +252,45 @@ protected:
     // after the process has spawned.
     if (value.isSome() && value.get() == "1") {
       mesos.reset(new Mesos(
-          mesos::ContentType::PROTOBUF,
+          ContentType::PROTOBUF,
           defer(self(), &Self::connected),
           defer(self(), &Self::disconnected),
-          defer(self(), &Self::received, lambda::_1)));
+          defer(self(), [this](queue<v1::executor::Event> events) {
+            while(!events.empty()) {
+              const v1::executor::Event& event = events.front();
+              received(devolve(event));
+
+              events.pop();
+            }
+          })));
     } else {
       mesos.reset(new V0ToV1Adapter(
           defer(self(), &Self::connected),
           defer(self(), &Self::disconnected),
-          defer(self(), &Self::received, lambda::_1)));
+          defer(self(), [this](queue<v1::executor::Event> events) {
+            while(!events.empty()) {
+              const v1::executor::Event& event = events.front();
+              received(devolve(event));
+
+              events.pop();
+            }
+          })));
     }
   }
 
   void taskHealthUpdated(
-      const mesos::TaskID& taskID,
+      const TaskID& taskID,
       const bool healthy,
       const bool initiateTaskKill)
   {
     cout << "Received task health update, healthy: "
          << stringify(healthy) << endl;
 
-    update(evolve(taskID), TASK_RUNNING, healthy);
+    update(taskID, TASK_RUNNING, healthy);
 
     if (initiateTaskKill) {
       killedByHealthCheck = true;
-      kill(evolve(taskID));
+      kill(taskID);
     }
   }
 
@@ -323,7 +318,7 @@ protected:
       subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
     }
 
-    mesos->send(call);
+    mesos->send(evolve(call));
 
     delay(Seconds(1), self(), &Self::doReliableRegistration);
   }
@@ -362,7 +357,7 @@ protected:
         ABORT("Failed to parse JSON: " + object.error());
       }
 
-      Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+      Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(object.get());
       if (parse.isError()) {
         ABORT("Failed to parse protobuf: " + parse.error());
       }
@@ -418,9 +413,9 @@ protected:
 
     if (task->has_health_check()) {
       Try<Owned<HealthChecker>> _checker = HealthChecker::create(
-          devolve(task->health_check()),
+          task->health_check(),
           self(),
-          devolve(task->task_id()));
+          task->task_id());
 
       if (_checker.isError()) {
         // TODO(gilbert): Consider ABORT and return a TASK_FAILED here.
@@ -559,9 +554,9 @@ private:
       CHECK_SOME(taskId);
       CHECK(taskId.get() == _taskId);
 
-      if (frameworkHasCapability(
-              devolve(frameworkInfo.get()),
-              mesos::FrameworkInfo::Capability::TASK_KILLING_STATE)) {
+      if (protobuf::frameworkHasCapability(
+              frameworkInfo.get(),
+              FrameworkInfo::Capability::TASK_KILLING_STATE)) {
         update(taskId.get(), TASK_KILLING);
       }
 
@@ -714,7 +709,7 @@ private:
     // Capture the status update.
     updates[uuid] = call.update();
 
-    mesos->send(call);
+    mesos->send(evolve(call));
   }
 
   enum State
@@ -757,7 +752,6 @@ private:
 };
 
 } // namespace internal {
-} // namespace v1 {
 } // namespace mesos {
 
 
@@ -811,8 +805,8 @@ public:
 int main(int argc, char** argv)
 {
   Flags flags;
-  FrameworkID frameworkId;
-  ExecutorID executorId;
+  mesos::FrameworkID frameworkId;
+  mesos::ExecutorID executorId;
 
 #ifdef __WINDOWS__
   process::Winsock winsock;
@@ -869,8 +863,8 @@ int main(int argc, char** argv)
     shutdownGracePeriod = parse.get();
   }
 
-  Owned<mesos::v1::internal::CommandExecutor> executor(
-      new mesos::v1::internal::CommandExecutor(
+  Owned<mesos::internal::CommandExecutor> executor(
+      new mesos::internal::CommandExecutor(
           flags.launcher_dir,
           flags.rootfs,
           flags.sandbox_directory,

http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/posix/executor.cpp b/src/launcher/posix/executor.cpp
index 6814b9f..43573ca 100644
--- a/src/launcher/posix/executor.cpp
+++ b/src/launcher/posix/executor.cpp
@@ -24,8 +24,6 @@
 
 #include <stout/os/raw/argv.hpp>
 
-#include "internal/devolve.hpp"
-
 #include "launcher/posix/executor.hpp"
 
 #ifdef __linux__
@@ -48,16 +46,14 @@ using std::endl;
 using std::string;
 using std::vector;
 
-using mesos::internal::devolve;
 using mesos::internal::slave::MESOS_CONTAINERIZER;
 using mesos::internal::slave::MesosContainerizerLaunch;
 
 namespace mesos {
-namespace v1 {
 namespace internal {
 
 pid_t launchTaskPosix(
-    const mesos::v1::CommandInfo& command,
+    const CommandInfo& command,
     const string& launcherDir,
     const Option<string>& user,
     const Option<string>& rootfs,
@@ -85,7 +81,7 @@ pid_t launchTaskPosix(
   // Prepare the flags to pass to the launch process.
   MesosContainerizerLaunch::Flags launchFlags;
 
-  launchFlags.command = JSON::protobuf(devolve(command));
+  launchFlags.command = JSON::protobuf(command);
 
   if (rootfs.isSome()) {
     CHECK_SOME(sandboxDirectory);
@@ -127,5 +123,4 @@ pid_t launchTaskPosix(
 }
 
 } // namespace internal {
-} // namespace v1 {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/posix/executor.hpp b/src/launcher/posix/executor.hpp
index a508089..9e46726 100644
--- a/src/launcher/posix/executor.hpp
+++ b/src/launcher/posix/executor.hpp
@@ -19,16 +19,15 @@
 
 #include <string>
 
-#include <mesos/v1/mesos.hpp>
+#include <mesos/mesos.hpp>
 
 #include <stout/option.hpp>
 
 namespace mesos {
-namespace v1 {
 namespace internal {
 
 pid_t launchTaskPosix(
-    const mesos::v1::CommandInfo& command,
+    const CommandInfo& command,
     const std::string& launcherDir,
     const Option<std::string>& user,
     const Option<std::string>& rootfs,
@@ -36,7 +35,6 @@ pid_t launchTaskPosix(
     const Option<std::string>& workingDirectory);
 
 } // namespace internal {
-} // namespace v1 {
 } // namespace mesos {
 
 #endif // __LAUNCHER_POSIX_EXECUTOR_HPP__