You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2014/05/13 01:02:26 UTC

git commit: Added --override flag to command executor.

Repository: mesos
Updated Branches:
  refs/heads/master 249011b3d -> 48247e156


Added --override flag to command executor.

It will be a common use case that external containerizer programs will
have to be able to control the command being carried out when the only
the task.command or container image is present.  The command executor
will in its current state try to run the command in the context of the
slave and not the container.

So, you could imagine the command executor being used by: 1) Replace
the command to run when the slave sends a runTask message 2) Prefix a
command to wrap call in when the slave sends a runTask message.

1) have been implemented and tested in this patch, alongside making
the command-executor "cli-ized" with help messages.

Review: https://reviews.apache.org/r/21329


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/48247e15
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/48247e15
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/48247e15

Branch: refs/heads/master
Commit: 48247e156256fe246383bab74a61839e9ac4ea68
Parents: 249011b
Author: Niklas Q. Nielsen <ni...@mesosphere.io>
Authored: Mon May 12 15:41:20 2014 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Mon May 12 16:02:15 2014 -0700

----------------------------------------------------------------------
 src/launcher/executor.cpp   |  97 +++++++++++++++++++++++++---
 src/tests/containerizer.cpp |   5 +-
 src/tests/containerizer.hpp |  10 ++-
 src/tests/slave_tests.cpp   | 132 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/48247e15/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 312606b..f1ec656 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -35,7 +35,9 @@
 #include <process/timer.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/flags.hpp>
 #include <stout/lambda.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/strings.hpp>
 
@@ -61,11 +63,12 @@ using namespace process;
 class CommandExecutorProcess : public Process<CommandExecutorProcess>
 {
 public:
-  CommandExecutorProcess()
+  CommandExecutorProcess(Option<char**> override)
     : launched(false),
       killed(false),
       pid(-1),
-      escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT) {}
+      escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
+      override(override) {}
 
   virtual ~CommandExecutorProcess() {}
 
@@ -168,9 +171,24 @@ public:
       os::close(pipes[1]);
 
       // The child has successfully setsid, now run the command.
-      std::cout << "sh -c '" << task.command().value() << "'" << std::endl;
-      execl("/bin/sh", "sh", "-c",
-            task.command().value().c_str(), (char*) NULL);
+
+      if (override.isNone()) {
+        std::cout << "sh -c '" << task.command().value() << "'" << std::endl;
+        execl("/bin/sh", "sh", "-c",
+              task.command().value().c_str(), (char*) NULL);
+      } else {
+        char** argv = override.get();
+
+        // argv is guaranteed to be NULL terminated and we rely on
+        // that fact to print command to be executed.
+        for (int i = 0; argv[i] != NULL; i++) {
+          std::cout << argv[i] << " ";
+        }
+        std::cout << std::endl;
+
+        execvp(argv[0], argv);
+      }
+
       perror("Failed to exec");
       abort();
     }
@@ -339,15 +357,16 @@ private:
   pid_t pid;
   Duration escalationTimeout;
   Timer escalationTimer;
+  Option<char**> override;
 };
 
 
 class CommandExecutor: public Executor
 {
 public:
-  CommandExecutor()
+  CommandExecutor(Option<char**> override)
   {
-    process = new CommandExecutorProcess();
+    process = new CommandExecutorProcess(override);
     spawn(process);
   }
 
@@ -420,9 +439,71 @@ private:
 } // namespace mesos {
 
 
+void usage(const char* argv0, const flags::FlagsBase& flags)
+{
+  cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl
+       << endl
+       << "Supported options:" << endl
+       << flags.usage();
+}
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    add(&override,
+        "override",
+        "Whether or not to override the command the executor should run\n"
+        "when the task is launched. Only this flag is expected to be on\n"
+        "the command line and all arguments after the flag will be used as\n"
+        "the subsequent 'argv' to be used with 'execvp'",
+        false);
+
+    // TODO(nnielsen): Add 'prefix' option to enable replacing
+    // 'sh -c' with user specified wrapper.
+  }
+
+  bool override;
+};
+
+
 int main(int argc, char** argv)
 {
-  mesos::internal::CommandExecutor executor;
+  Flags flags;
+
+  bool help;
+  flags.add(&help,
+            "help",
+            "Prints this help message",
+            false);
+
+  // Load flags from environment and command line.
+  Try<Nothing> load = flags.load(None(), argc, argv);
+
+  if (load.isError()) {
+    cerr << load.error() << endl;
+    usage(argv[0], flags);
+    return -1;
+  }
+
+  if (help) {
+    usage(argv[0], flags);
+    return -1;
+  }
+
+  Option<char**> override = None();
+  if (flags.override) {
+    // TODO(nnielsen): We assume that when you run "--override" that
+    // there won't be other flags or arguments. In the future, we
+    // should be able to use MESOS-1345.
+    if (argc > 2) {
+      override = argv + 2;
+    }
+  }
+
+  mesos::internal::CommandExecutor executor(override);
   mesos::MesosExecutorDriver driver(&executor);
   return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/48247e15/src/tests/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index 7dacab5..3f11d35 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -162,7 +162,7 @@ Future<Nothing> TestContainerizer::launch(
 }
 
 
-Future<containerizer::Termination> TestContainerizer::wait(
+Future<containerizer::Termination> TestContainerizer::_wait(
     const ContainerID& containerId)
 {
   // An unknown container is possible for tests where we "drop" the
@@ -236,6 +236,9 @@ void TestContainerizer::setup()
 
   EXPECT_CALL(*this, launch(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &TestContainerizer::_launch));
+
+  EXPECT_CALL(*this, wait(_))
+    .WillRepeatedly(Invoke(this, &TestContainerizer::_wait));
 }
 
 } // namespace tests {

http://git-wip-us.apache.org/repos/asf/mesos/blob/48247e15/src/tests/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.hpp b/src/tests/containerizer.hpp
index 8e21bd1..9325864 100644
--- a/src/tests/containerizer.hpp
+++ b/src/tests/containerizer.hpp
@@ -85,9 +85,6 @@ public:
       const process::PID<slave::Slave>& slavePid,
       bool checkpoint);
 
-  virtual process::Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
   // Additional destroy method for testing because we won't know the
   // ContainerID created for each container.
   void destroy(const FrameworkID& frameworkId, const ExecutorID& executorId);
@@ -108,6 +105,10 @@ public:
       usage,
       process::Future<ResourceStatistics>(const ContainerID&));
 
+  MOCK_METHOD1(
+      wait,
+      process::Future<containerizer::Termination>(const ContainerID&));
+
 private:
   void setup();
 
@@ -121,6 +122,9 @@ private:
       const process::PID<slave::Slave>& slavePid,
       bool checkpoint);
 
+  process::Future<containerizer::Termination> _wait(
+      const ContainerID& containerId);
+
   hashmap<ExecutorID, Executor*> executors;
 
   hashmap<std::pair<FrameworkID, ExecutorID>, ContainerID> containers_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/48247e15/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 458356d..eca276b 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -20,6 +20,7 @@
 
 #include <gmock/gmock.h>
 
+#include <map>
 #include <string>
 #include <vector>
 
@@ -29,8 +30,10 @@
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/io.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/subprocess.hpp>
 
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -46,6 +49,7 @@
 #include "slave/slave.hpp"
 
 #include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
 using namespace mesos;
@@ -65,6 +69,7 @@ using process::Future;
 using process::Owned;
 using process::PID;
 
+using std::map;
 using std::string;
 using std::vector;
 
@@ -248,6 +253,133 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
 }
 
 
+// Test that we can run the mesos-executor and specify an "override"
+// command to use via the --override argument.
+TEST_F(SlaveTest, MesosExecutorWithOverride)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  TestContainerizer containerizer;
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Launch a task with the command executor.
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+
+  CommandInfo command;
+  command.set_value("sleep 10");
+
+  task.mutable_command()->MergeFrom(command);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Expect the launch but don't do anything as we'll be launching the
+  // executor ourselves manually below.
+  Future<Nothing> launch;
+  EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&launch),
+                    Return(Future<Nothing>())));
+
+  // Expect wait after launch is called. wait() will fail if not
+  // intercepted here as the container will never be registered within
+  // the TestContainerizer when launch() is intercepted above.
+  Future<Nothing> wait;
+  process::Promise<containerizer::Termination> promise;
+  EXPECT_CALL(containerizer, wait(_))
+    .WillOnce(DoAll(FutureSatisfy(&wait),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Once we get the launch the mesos-executor with --override.
+  AWAIT_READY(launch);
+
+  // Set up fake environment for executor.
+  map<string, string> environment;
+  environment["MESOS_SLAVE_PID"] = stringify(slave.get());
+  environment["MESOS_SLAVE_ID"] = stringify(offers.get()[0].slave_id());
+  environment["MESOS_FRAMEWORK_ID"] = stringify(offers.get()[0].framework_id());
+  environment["MESOS_EXECUTOR_ID"] = stringify(task.task_id());
+  environment["MESOS_DIRECTORY"] = "";
+
+  // Create temporary file to store validation string. If command is
+  // succesfully replaced, this file will end up containing the string
+  // 'Hello World\n'. Otherwise, the original task command i.e.
+  // 'sleep' will be called and the test will fail.
+  Try<std::string> file = os::mktemp();
+  ASSERT_SOME(file);
+
+  string executorCommand =
+    path::join(tests::flags.build_dir, "src", "mesos-executor") +
+    " --override /bin/sh -c 'echo hello world >" + file.get() + "'";
+
+  // Expect two status updates, one for once the mesos-executor says
+  // the task is running and one for after our overridden command
+  // above finishes.
+  Future<TaskStatus> status1, status2;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  Try<process::Subprocess> executor =
+    process::subprocess(executorCommand, environment);
+
+  ASSERT_SOME(executor);
+
+  // Scheduler should receive the TASK_RUNNING update.
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+  AWAIT_READY(status2);
+  ASSERT_EQ(TASK_FINISHED, status2.get().state());
+
+  containerizer::Termination termination;
+  termination.set_killed(false);
+  termination.set_message("Killed executor");
+  termination.set_status(0);
+  promise.set(termination);
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(executor.get().status());
+
+  // Verify file contents.
+  Try<std::string> validate = os::read(file.get());
+  ASSERT_SOME(validate);
+
+  EXPECT_EQ(validate.get(), "hello world\n");
+
+  os::rm(file.get());
+
+  Shutdown();
+}
+
+
 // This test runs a command without the command user field set. The
 // command will verify the assumption that the command is run as the
 // slave user (in this case, root).