You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/25 01:36:46 UTC

[2/3] mesos git commit: Handle scheduler pid as optional in the slave.

Handle scheduler pid as optional in the slave.

This is anticipation of HTTP scheduler support in 0.24.0.
Note that the 'pid' is set for driver-based schedulers. The
corresponding master changes to not set 'pid' for HTTP
schedulers have not occurred yet.

Review: https://reviews.apache.org/r/36760


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9172a5f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9172a5f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9172a5f5

Branch: refs/heads/master
Commit: 9172a5f50bc26c2bd88ff7382a0b5f0ccaf73b14
Parents: ac70a59
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Jul 23 15:17:22 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp       |  5 +-
 src/messages/messages.proto | 13 +++++-
 src/slave/slave.cpp         | 99 ++++++++++++++++++++++++++++++----------
 src/slave/slave.hpp         | 16 +++++--
 src/slave/state.hpp         |  4 ++
 src/tests/mesos.cpp         |  7 ++-
 src/tests/mesos.hpp         |  8 ++--
 src/tests/slave_tests.cpp   |  5 +-
 8 files changed, 111 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6d64bfc..613a011 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5042,8 +5042,9 @@ void Master::addSlave(
   // TODO(vinod): Reconcile the notion of a completed framework across the
   // master and slave.
   foreach (const Archive::Framework& completedFramework, completedFrameworks) {
-    const FrameworkID& frameworkId = completedFramework.framework_info().id();
-    Framework* framework = getFramework(frameworkId);
+    Framework* framework = getFramework(
+        completedFramework.framework_info().id());
+
     foreach (const Task& task, completedFramework.tasks()) {
       if (framework != NULL) {
         VLOG(2) << "Re-adding completed task " << task.task_id()

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 165a16d..8977d8e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -193,8 +193,15 @@ message RunTaskMessage {
   // TODO(karya): Remove framework_id after MESOS-2559 has shipped.
   optional FrameworkID framework_id = 1 [deprecated = true];
   required FrameworkInfo framework = 2;
-  required string pid = 3;
   required TaskInfo task = 4;
+
+  // The pid of the framework. This was moved to 'optional' in
+  // 0.24.0 to support schedulers using the HTTP API. For now, we
+  // continue to always set pid since it was required in 0.23.x.
+  // When 'pid' is unset, or set to empty string, the slave will
+  // forward executor messages through the master. For schedulers
+  // still using the driver, this will remain set.
+  optional string pid = 3;
 }
 
 
@@ -335,7 +342,9 @@ message ShutdownExecutorMessage {
 
 message UpdateFrameworkMessage {
   required FrameworkID framework_id = 1;
-  required string pid = 2;
+
+  // See the comment on RunTaskMessage.pid.
+  optional string pid = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 784fdc8..4ba95f9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1165,13 +1165,16 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
       VLOG(1) << "Reregistering completed framework "
                 << completedFramework->id();
+
       Archive::Framework* completedFramework_ =
         message.add_completed_frameworks();
-      FrameworkInfo* frameworkInfo =
-        completedFramework_->mutable_framework_info();
-      frameworkInfo->CopyFrom(completedFramework->info);
 
-      completedFramework_->set_pid(completedFramework->pid);
+      completedFramework_->mutable_framework_info()->CopyFrom(
+          completedFramework->info);
+
+      if (completedFramework->pid.isSome()) {
+        completedFramework_->set_pid(completedFramework->pid.get());
+      }
 
       foreach (const Owned<Executor>& executor,
                completedFramework->completedExecutors) {
@@ -1179,10 +1182,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
                 << " with " << executor->terminatedTasks.size()
                 << " terminated tasks, " << executor->completedTasks.size()
                 << " completed tasks";
+
         foreach (const Task* task, executor->terminatedTasks.values()) {
           VLOG(2) << "Reregistering terminated task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
         }
+
         foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
           VLOG(2) << "Reregistering completed task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
@@ -1222,7 +1227,7 @@ void Slave::runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo_,
     const FrameworkID& frameworkId_,
-    const string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   if (master != from) {
@@ -1291,7 +1296,13 @@ void Slave::runTask(
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
 
-    framework = new Framework(this, frameworkInfo, pid);
+    Option<UPID> frameworkPid = None();
+
+    if (pid != UPID()) {
+      frameworkPid = pid;
+    }
+
+    framework = new Framework(this, frameworkInfo, frameworkPid);
     frameworks[frameworkId] = framework;
 
     // Is this same framework in completedFrameworks? If so, move the completed
@@ -1340,14 +1351,13 @@ void Slave::runTask(
 
   // Run the task after the unschedules are done.
   unschedule.onAny(
-      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
+      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
 }
 
 
 void Slave::_runTask(
     const Future<bool>& future,
     const FrameworkInfo& frameworkInfo,
-    const string& pid,
     const TaskInfo& task)
 {
   const FrameworkID frameworkId = frameworkInfo.id();
@@ -1733,8 +1743,12 @@ void Slave::runTasks(
     RunTaskMessage message;
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_framework()->MergeFrom(framework->info);
-    message.set_pid(framework->pid);
     message.mutable_task()->MergeFrom(task);
+
+    // Note that 0.23.x executors require the 'pid' to be set
+    // to decode the message, but do not use the field.
+    message.set_pid(framework->pid.getOrElse(UPID()));
+
     send(executor->pid, message);
   }
 }
@@ -2087,7 +2101,9 @@ void Slave::schedulerMessage(
 }
 
 
-void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
+void Slave::updateFramework(
+    const FrameworkID& frameworkId,
+    const UPID& pid)
 {
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
@@ -2115,15 +2131,25 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
     case Framework::RUNNING: {
       LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
 
-      framework->pid = pid;
+      if (pid == UPID()) {
+        framework->pid = None();
+      } else {
+        framework->pid = pid;
+      }
+
       if (framework->info.checkpoint()) {
-        // Checkpoint the framework pid.
+        // Checkpoint the framework pid, note that when the 'pid'
+        // is None, we checkpoint a default UPID() because
+        // 0.23.x slaves consider a missing pid file to be an
+        // error.
         const string path = paths::getFrameworkPidPath(
             metaDir, info.id(), frameworkId);
 
-        VLOG(1) << "Checkpointing framework pid '"
-                << framework->pid << "' to '" << path << "'";
-        CHECK_SOME(state::checkpoint(path, framework->pid));
+        VLOG(1) << "Checkpointing framework pid"
+                << " '" << framework->pid.getOrElse(UPID()) << "'"
+                << " to '" << path << "'";
+
+        CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
       }
 
       // Inform status update manager to immediately resend any pending
@@ -2989,15 +3015,23 @@ void Slave::executorMessage(
     return;
   }
 
-  LOG(INFO) << "Sending message for framework " << frameworkId
-            << " to " << framework->pid;
-
   ExecutorToFrameworkMessage message;
   message.mutable_slave_id()->MergeFrom(slaveId);
   message.mutable_framework_id()->MergeFrom(frameworkId);
   message.mutable_executor_id()->MergeFrom(executorId);
   message.set_data(data);
-  send(framework->pid, message);
+
+  CHECK_SOME(master);
+
+  if (framework->pid.isSome()) {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " to " << framework->pid.get();
+    send(framework->pid.get(), message);
+  } else {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " through the master " << master.get();
+    send(master.get(), message);
+  }
 
   metrics.valid_framework_messages++;
 }
@@ -4142,8 +4176,17 @@ void Slave::recoverFramework(const FrameworkState& state)
     CHECK_EQ(frameworkInfo.id(), state.id);
   }
 
+  // In 0.24.0, HTTP schedulers are supported and these do not
+  // have a 'pid'. In this case, the slave will checkpoint UPID().
   CHECK_SOME(state.pid);
-  Framework* framework = new Framework(this, frameworkInfo, state.pid.get());
+
+  Option<UPID> pid = state.pid.get();
+
+  if (pid.get() == UPID()) {
+    pid = None();
+  }
+
+  Framework* framework = new Framework(this, frameworkInfo, pid);
   frameworks[framework->id()] = framework;
 
   // Now recover the executors for this framework.
@@ -4662,7 +4705,7 @@ double Slave::_resources_revocable_percent(const string& name)
 Framework::Framework(
     Slave* _slave,
     const FrameworkInfo& _info,
-    const UPID& _pid)
+    const Option<UPID>& _pid)
   : state(RUNNING),
     slave(_slave),
     info(_info),
@@ -4675,15 +4718,21 @@ Framework::Framework(
         slave->metaDir, slave->info.id(), id());
 
     VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'";
+
     CHECK_SOME(state::checkpoint(path, info));
 
-    // Checkpoint the framework pid.
+    // Checkpoint the framework pid, note that we checkpoint a
+    // UPID() when it is None (for HTTP schedulers) because
+    // 0.23.x slaves consider a missing pid file to be an
+    // error.
     path = paths::getFrameworkPidPath(
         slave->metaDir, slave->info.id(), id());
 
-    VLOG(1) << "Checkpointing framework pid '"
-            << pid << "' to '" << path << "'";
-    CHECK_SOME(state::checkpoint(path, pid));
+    VLOG(1) << "Checkpointing framework pid"
+            << " '" << pid.getOrElse(UPID()) << "'"
+            << " to '" << path << "'";
+
+    CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID())));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dec4ca8..41d0949 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -117,14 +117,13 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
   // Made 'virtual' for Slave mocking.
   virtual void _runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   process::Future<bool> unschedule(const std::string& path);
@@ -150,7 +149,9 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
-  void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
+  void updateFramework(
+      const FrameworkID& frameworkId,
+      const process::UPID& pid);
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
@@ -634,7 +635,7 @@ struct Framework
   Framework(
       Slave* slave,
       const FrameworkInfo& info,
-      const process::UPID& pid);
+      const Option<process::UPID>& pid);
 
   ~Framework();
 
@@ -660,7 +661,12 @@ struct Framework
 
   const FrameworkInfo info;
 
-  UPID pid;
+  // Frameworks using the scheduler driver will have a 'pid',
+  // which allows us to send executor messages directly to the
+  // driver. Frameworks using the HTTP API (in 0.24.0) will
+  // not have a 'pid', in which case executor messages are
+  // sent through the master.
+  Option<UPID> pid;
 
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 4e00468..cecf200 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -248,7 +248,11 @@ struct FrameworkState
 
   FrameworkID id;
   Option<FrameworkInfo> info;
+
+  // Note that HTTP frameworks (supported in 0.24.0) do not have a
+  // PID, in which case 'pid' is Some(UPID()) rather than None().
   Option<process::UPID> pid;
+
   hashmap<ExecutorID, ExecutorState> executors;
   unsigned int errors;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index f09ef0f..f3b7315 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -462,7 +462,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _runTask(_, _, _, _))
+  EXPECT_CALL(*this, _runTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
   EXPECT_CALL(*this, killTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -485,7 +485,7 @@ void MockSlave::unmocked_runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const FrameworkID& frameworkId,
-    const std::string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
@@ -495,10 +495,9 @@ void MockSlave::unmocked_runTask(
 void MockSlave::unmocked__runTask(
       const Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task)
 {
-  slave::Slave::_runTask(future, frameworkInfo, pid, task);
+  slave::Slave::_runTask(future, frameworkInfo, task);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8a76b4f..1759d7e 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -811,26 +811,24 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task));
 
   void unmocked_runTask(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
-  MOCK_METHOD4(_runTask, void(
+  MOCK_METHOD3(_runTask, void(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task));
 
   void unmocked__runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   MOCK_METHOD3(killTask, void(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b145d76..64cef6e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1793,7 +1793,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _runTask;
-  EXPECT_CALL(slave, _runTask(_, _, _, _))
+  EXPECT_CALL(slave, _runTask(_, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_runTask),
                     SaveArg<0>(&future),
                     SaveArg<1>(&frameworkInfo)));
@@ -1818,8 +1818,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   driver.killTask(task.task_id());
 
   AWAIT_READY(killTask);
-  slave.unmocked__runTask(
-      future, frameworkInfo, master.get(), task);
+  slave.unmocked__runTask(future, frameworkInfo, task);
 
   AWAIT_READY(removeFramework);