You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/03/03 11:46:00 UTC

[1/3] mesos git commit: Added `FrameworkInfo` to `UpdateFrameworkMessage`.

Repository: mesos
Updated Branches:
  refs/heads/master 61777fdaa -> ed5203745


Added `FrameworkInfo` to `UpdateFrameworkMessage`.

Review: https://reviews.apache.org/r/57106


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60577e77
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60577e77
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60577e77

Branch: refs/heads/master
Commit: 60577e7730315f0e591527aac7abdf73aa50de5f
Parents: 61777fd
Author: Michael Park <mp...@apache.org>
Authored: Sun Feb 12 17:03:29 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Fri Mar 3 03:38:19 2017 -0800

----------------------------------------------------------------------
 src/messages/messages.proto | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/60577e77/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 2da89a8..508ff59 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -568,6 +568,9 @@ message UpdateFrameworkMessage {
 
   // See the comment on RunTaskMessage.pid.
   optional string pid = 2;
+
+  // Updated framework info.
+  optional FrameworkInfo framework_info = 3;
 }
 
 


[2/3] mesos git commit: Handled the `FrameworkInfo` field added to the `UpdateFrameworkMessage`.

Posted by mp...@apache.org.
Handled the `FrameworkInfo` field added to the `UpdateFrameworkMessage`.

Review: https://reviews.apache.org/r/57108


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4912f341
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4912f341
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4912f341

Branch: refs/heads/master
Commit: 4912f341dce12f10b43fe82db20086032391e95a
Parents: 60577e7
Author: Michael Park <mp...@apache.org>
Authored: Sun Feb 12 17:36:32 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Fri Mar 3 03:39:02 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp              |  9 ++++++---
 src/slave/slave.cpp                | 30 +++++++++++++-----------------
 src/slave/slave.hpp                |  5 +++--
 src/tests/slave_recovery_tests.cpp | 10 +++++++++-
 src/tests/slave_tests.cpp          |  8 +++++++-
 5 files changed, 38 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4912f341/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 442fbc6..43e6fad 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2676,12 +2676,13 @@ void Master::_subscribe(
   // it currently isn't running any tasks.
   foreachvalue (Slave* slave, slaves.registered) {
     UpdateFrameworkMessage message;
-    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
+    message.mutable_framework_id()->CopyFrom(frameworkInfo.id());
 
     // TODO(anand): We set 'pid' to UPID() for http frameworks
     // as 'pid' was made optional in 0.24.0. In 0.25.0, we
     // no longer have to set pid here for http frameworks.
     message.set_pid(UPID());
+    message.mutable_framework_info()->CopyFrom(frameworkInfo);
     send(slave->pid, message);
   }
 }
@@ -3038,8 +3039,9 @@ void Master::_subscribe(
   // it currently isn't running any tasks.
   foreachvalue (Slave* slave, slaves.registered) {
     UpdateFrameworkMessage message;
-    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
+    message.mutable_framework_id()->CopyFrom(frameworkInfo.id());
     message.set_pid(from);
+    message.mutable_framework_info()->CopyFrom(frameworkInfo);
     send(slave->pid, message);
   }
 }
@@ -5961,12 +5963,13 @@ void Master::__reregisterSlave(
     // re-registering slaves.
     if (framework != nullptr && framework->connected()) {
       UpdateFrameworkMessage message;
-      message.mutable_framework_id()->MergeFrom(framework->id());
+      message.mutable_framework_id()->CopyFrom(framework->id());
 
       // TODO(anand): We set 'pid' to UPID() for http frameworks
       // as 'pid' was made optional in 0.24.0. In 0.25.0, we
       // no longer have to set pid here for http frameworks.
       message.set_pid(framework->pid.getOrElse(UPID()));
+      message.mutable_framework_info()->CopyFrom(framework->info);
 
       send(slave->pid, message);
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4912f341/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ae9458..4db367c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -616,7 +616,8 @@ void Slave::initialize()
   install<UpdateFrameworkMessage>(
       &Slave::updateFramework,
       &UpdateFrameworkMessage::framework_id,
-      &UpdateFrameworkMessage::pid);
+      &UpdateFrameworkMessage::pid,
+      &UpdateFrameworkMessage::framework_info);
 
   install<CheckpointResourcesMessage>(
       &Slave::checkpointResources,
@@ -2799,7 +2800,8 @@ void Slave::schedulerMessage(
 
 void Slave::updateFramework(
     const FrameworkID& frameworkId,
-    const UPID& pid)
+    const UPID& pid,
+    const FrameworkInfo& frameworkInfo)
 {
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
@@ -2814,18 +2816,23 @@ void Slave::updateFramework(
 
   Framework* framework = getFramework(frameworkId);
   if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
+    LOG(WARNING) << "Ignoring info update for framework " << frameworkId
                  << " because it does not exist";
     return;
   }
 
   switch (framework->state) {
     case Framework::TERMINATING:
-      LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
+      LOG(WARNING) << "Ignoring info update for framework " << frameworkId
                    << " because it is terminating";
       break;
     case Framework::RUNNING: {
-      LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
+      LOG(INFO) << "Updating info for framework " << frameworkId
+                << (pid != UPID() ? "with pid updated to " + stringify(pid)
+                                  : "");
+
+      framework->info.CopyFrom(frameworkInfo);
+      framework->capabilities = frameworkInfo.capabilities();
 
       if (pid == UPID()) {
         framework->pid = None();
@@ -2834,18 +2841,7 @@ void Slave::updateFramework(
       }
 
       if (framework->info.checkpoint()) {
-        // Checkpoint the framework pid, note that when the 'pid'
-        // is None, we checkpoint a default UPID() because
-        // 0.23.x slaves consider a missing pid file to be an
-        // error.
-        const string path = paths::getFrameworkPidPath(
-            metaDir, info.id(), frameworkId);
-
-        VLOG(1) << "Checkpointing framework pid"
-                << " '" << framework->pid.getOrElse(UPID()) << "'"
-                << " to '" << path << "'";
-
-        CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
+        framework->checkpointFramework();
       }
 
       // Inform status update manager to immediately resend any pending

http://git-wip-us.apache.org/repos/asf/mesos/blob/4912f341/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 449971b..e6fac20 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -187,7 +187,8 @@ public:
 
   void updateFramework(
       const FrameworkID& frameworkId,
-      const process::UPID& pid);
+      const process::UPID& pid,
+      const FrameworkInfo& frameworkInfo);
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
@@ -1065,7 +1066,7 @@ struct Framework
   // the 'Slave' class.
   Slave* slave;
 
-  const FrameworkInfo info;
+  FrameworkInfo info;
 
   protobuf::framework::Capabilities capabilities;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4912f341/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index b5b8058..a29b29c 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1976,9 +1976,17 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
 
   Future<Nothing> updateFramework = FUTURE_DISPATCH(_, &Slave::updateFramework);
 
+  // Set the `FrameworkID` in `FrameworkInfo`.
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
   // Simulate a 'UpdateFrameworkMessage' to ensure framework pid is
   // not being checkpointed.
-  process::dispatch(slave.get()->pid, &Slave::updateFramework, frameworkId, "");
+  process::dispatch(
+      slave.get()->pid,
+      &Slave::updateFramework,
+      frameworkId,
+      "",
+      frameworkInfo);
 
   AWAIT_READY(updateFramework);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4912f341/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 3731c76..ec2cd34 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4619,9 +4619,11 @@ TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade)
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
   ASSERT_SOME(slave);
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   Future<FrameworkID> frameworkId;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -4644,11 +4646,15 @@ TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade)
   AWAIT_READY(frameworkId);
   AWAIT_READY(launchTask);
 
+  // Set the `FrameworkID` in `FrameworkInfo`.
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId.get());
+
   // Now spoof a live upgrade of the framework by updating
   // the framework information to have an empty pid.
   UpdateFrameworkMessage updateFrameworkMessage;
   updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get());
   updateFrameworkMessage.set_pid("");
+  updateFrameworkMessage.mutable_framework_info()->CopyFrom(frameworkInfo);
 
   process::post(master.get()->pid, slave.get()->pid, updateFrameworkMessage);
 


[3/3] mesos git commit: Re-checkpointed the `Executor`s and `Task`s during agent recovery.

Posted by mp...@apache.org.
Re-checkpointed the `Executor`s and `Task`s during agent recovery.

Review: https://reviews.apache.org/r/57109


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed520374
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed520374
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed520374

Branch: refs/heads/master
Commit: ed52037457e471a26e00888bd11efda12c0593d2
Parents: 4912f34
Author: Michael Park <mp...@apache.org>
Authored: Mon Feb 13 14:38:49 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Fri Mar 3 03:39:59 2017 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 81 +++++++++++++++++++++++++++++++++++-------------
 src/slave/slave.hpp | 17 ++++++++--
 2 files changed, 73 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4db367c..775f43b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5295,6 +5295,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
       const FrameworkInfo& frameworkInfo) {
     set<string> roles = protobuf::framework::getRoles(frameworkInfo);
 
+    bool injectedAllocationInfo = false;
     foreach (Resource& resource, *resources) {
       if (!resource.has_allocation_info()) {
         if (roles.size() != 1) {
@@ -5304,14 +5305,22 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
         }
 
         resource.mutable_allocation_info()->set_role(*roles.begin());
+        injectedAllocationInfo = true;
       }
     }
+
+    return injectedAllocationInfo;
   };
 
-  // TODO(bmahler): We currently don't allow frameworks to
-  // change their roles so we do not need to re-persist the
-  // resources with `AllocationInfo` injected for existing
-  // tasks and executors.
+  // In order to allow frameworks to change their role(s), we need to keep
+  // track of the fact that the resources used to be implicitly allocated to
+  // `FrameworkInfo.role` before the agent upgrade. To this end, we inject
+  // the `AllocationInfo` to the resources in `ExecutorState` and `TaskState`,
+  // and re-checkpoint them if necessary.
+
+  hashset<ExecutorID> injectedExecutors;
+  hashmap<ExecutorID, hashset<TaskID>> injectedTasks;
+
   if (slaveState.isSome()) {
     foreachvalue (FrameworkState& frameworkState, slaveState->frameworks) {
       if (!frameworkState.info.isSome()) {
@@ -5323,9 +5332,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
           continue;
         }
 
-        injectAllocationInfo(
-            executorState.info->mutable_resources(),
-            frameworkState.info.get());
+        if (injectAllocationInfo(
+                executorState.info->mutable_resources(),
+                frameworkState.info.get())) {
+          injectedExecutors.insert(executorState.id);
+        }
 
         foreachvalue (RunState& runState, executorState.runs) {
           foreachvalue (TaskState& taskState, runState.tasks) {
@@ -5333,9 +5344,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
               continue;
             }
 
-            injectAllocationInfo(
-                taskState.info->mutable_resources(),
-                frameworkState.info.get());
+            if (injectAllocationInfo(
+                    taskState.info->mutable_resources(),
+                    frameworkState.info.get())) {
+              injectedTasks[executorState.id].insert(taskState.id);
+            }
           }
         }
       }
@@ -5452,7 +5465,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
     // Recover the frameworks.
     foreachvalue (const FrameworkState& frameworkState,
                   slaveState.get().frameworks) {
-      recoverFramework(frameworkState);
+      recoverFramework(frameworkState, injectedExecutors, injectedTasks);
     }
   }
 
@@ -5638,7 +5651,10 @@ void Slave::__recover(const Future<Nothing>& future)
 }
 
 
-void Slave::recoverFramework(const FrameworkState& state)
+void Slave::recoverFramework(
+    const FrameworkState& state,
+    const hashset<ExecutorID>& executorsToRecheckpoint,
+    const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint)
 {
   LOG(INFO) << "Recovering framework " << state.id;
 
@@ -5692,7 +5708,12 @@ void Slave::recoverFramework(const FrameworkState& state)
 
   // Now recover the executors for this framework.
   foreachvalue (const ExecutorState& executorState, state.executors) {
-    framework->recoverExecutor(executorState);
+    framework->recoverExecutor(
+        executorState,
+        executorsToRecheckpoint.contains(executorState.id),
+        tasksToRecheckpoint.contains(executorState.id)
+            ? tasksToRecheckpoint.at(executorState.id)
+            : hashset<TaskID>{});
   }
 
   // Remove the framework in case we didn't recover any executors.
@@ -6642,7 +6663,10 @@ Executor* Slave::getExecutor(const ContainerID& containerId) const
 }
 
 
-void Framework::recoverExecutor(const ExecutorState& state)
+void Framework::recoverExecutor(
+    const ExecutorState& state,
+    bool recheckpointExecutor,
+    const hashset<TaskID>& tasksToRecheckpoint)
 {
   LOG(INFO) << "Recovering executor '" << state.id
             << "' of framework " << id();
@@ -6738,7 +6762,9 @@ void Framework::recoverExecutor(const ExecutorState& state)
 
   // And finally recover all the executor's tasks.
   foreachvalue (const TaskState& taskState, run.get().tasks) {
-    executor->recoverTask(taskState);
+    executor->recoverTask(
+        taskState,
+        tasksToRecheckpoint.contains(taskState.id));
   }
 
   ExecutorID executorId = state.id;
@@ -6762,6 +6788,9 @@ void Framework::recoverExecutor(const ExecutorState& state)
 
   // Add the executor to the framework.
   executors[executor->id] = executor;
+  if (recheckpointExecutor) {
+    executor->checkpointExecutor();
+  }
 
   // If the latest run of the executor was completed (i.e., terminated
   // and all updates are acknowledged) in the previous run, we
@@ -6890,8 +6919,6 @@ void Executor::checkpointExecutor()
 {
   CHECK(checkpoint);
 
-  CHECK_NE(slave->state, slave->RECOVERING);
-
   // Checkpoint the executor info.
   const string path = paths::getExecutorInfoPath(
       slave->metaDir, slave->info.id(), frameworkId, id);
@@ -6908,23 +6935,28 @@ void Executor::checkpointExecutor()
 
 void Executor::checkpointTask(const TaskInfo& task)
 {
+  checkpointTask(protobuf::createTask(task, TASK_STAGING, frameworkId));
+}
+
+
+void Executor::checkpointTask(const Task& task)
+{
   CHECK(checkpoint);
 
-  const Task t = protobuf::createTask(task, TASK_STAGING, frameworkId);
   const string path = paths::getTaskInfoPath(
       slave->metaDir,
       slave->info.id(),
       frameworkId,
       id,
       containerId,
-      t.task_id());
+      task.task_id());
 
   VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
-  CHECK_SOME(state::checkpoint(path, t));
+  CHECK_SOME(state::checkpoint(path, task));
 }
 
 
-void Executor::recoverTask(const TaskState& state)
+void Executor::recoverTask(const TaskState& state, bool recheckpointTask)
 {
   if (state.info.isNone()) {
     LOG(WARNING) << "Skipping recovery of task " << state.id
@@ -6938,7 +6970,12 @@ void Executor::recoverTask(const TaskState& state)
     CHECK(resource.has_allocation_info());
   }
 
-  launchedTasks[state.id] = new Task(state.info.get());
+  Task* task = new Task(state.info.get());
+  if (recheckpointTask) {
+    checkpointTask(*task);
+  }
+
+  launchedTasks[state.id] = task;
 
   // NOTE: Since some tasks might have been terminated when the
   // slave was down, the executor resources we capture here is an

http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index e6fac20..857338c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -411,7 +411,10 @@ public:
   virtual void __recover(const process::Future<Nothing>& future);
 
   // Helper to recover a framework from the specified state.
-  void recoverFramework(const state::FrameworkState& state);
+  void recoverFramework(
+      const state::FrameworkState& state,
+      const hashset<ExecutorID>& executorsToRecheckpoint,
+      const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint);
 
   // Removes and garbage collects the executor.
   void removeExecutor(Framework* framework, Executor* executor);
@@ -890,7 +893,10 @@ struct Executor
   void completeTask(const TaskID& taskId);
   void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
-  void recoverTask(const state::TaskState& state);
+  void checkpointTask(const Task& task);
+
+  void recoverTask(const state::TaskState& state, bool recheckpointTask);
+
   Try<Nothing> updateTaskState(const TaskStatus& status);
 
   // Returns true if there are any queued/launched/terminated tasks.
@@ -1050,7 +1056,12 @@ struct Framework
   void destroyExecutor(const ExecutorID& executorId);
   Executor* getExecutor(const ExecutorID& executorId) const;
   Executor* getExecutor(const TaskID& taskId) const;
-  void recoverExecutor(const state::ExecutorState& state);
+
+  void recoverExecutor(
+      const state::ExecutorState& state,
+      bool recheckpointExecutor,
+      const hashset<TaskID>& tasksToRecheckpoint);
+
   void checkpointFramework() const;
 
   const FrameworkID id() const { return info.id(); }