You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/05 19:43:53 UTC

[4/5] git commit: Refactored slave recovery so that slave recovers its state before the isolator and status update manager.

Refactored slave recovery so that slave recovers its state before
the isolator and status update manager.

Review: https://reviews.apache.org/r/13216


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/83d5c257
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/83d5c257
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/83d5c257

Branch: refs/heads/master
Commit: 83d5c257ec2b0229231a0c49c6fc516ef3259e8e
Parents: dfba04a
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 1 22:22:21 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Sat Aug 3 13:31:51 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 103 +++++++++++++++++++++++++----------------------
 src/slave/slave.hpp |   2 +-
 2 files changed, 56 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/83d5c257/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8c953c2..9cd7754 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2582,8 +2582,12 @@ Future<Nothing> Slave::recover(bool reconnect, bool strict)
 
   info = state.get().info.get(); // Recover the slave info.
 
-  // Recover the status update manager, then
-  // the isolator and then the executors.
+  // First, recover the frameworks and executors.
+  foreachvalue (const FrameworkState& frameworkState, state.get().frameworks) {
+    recoverFramework(frameworkState);
+  }
+
+  // Now recover the status update manager and then the isolator.
   return statusUpdateManager->recover(metaDir, state.get())
            .then(defer(isolator, &Isolator::recover, state.get()))
            .then(defer(self(), &Self::_recover, state.get(), reconnect));
@@ -2592,8 +2596,48 @@ Future<Nothing> Slave::recover(bool reconnect, bool strict)
 
 Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
 {
-  foreachvalue (const FrameworkState& frameworkState, state.frameworks) {
-    recoverFramework(frameworkState, reconnect);
+  foreachvalue(Framework* framework, frameworks){
+    foreachvalue(Executor* executor, framework->executors) {
+      // Monitor the executor.
+      monitor.watch(
+          framework->id,
+          executor->id,
+          executor->info,
+          flags.resource_monitoring_interval)
+        .onAny(lambda::bind(_watch, lambda::_1, framework->id, executor->id));
+
+      if (reconnect) {
+        if (executor->pid) {
+          LOG(INFO) << "Sending reconnect request to executor " << executor->id
+                    << " of framework " << framework->id
+                    << " at " << executor->pid;
+
+          ReconnectExecutorMessage message;
+          message.mutable_slave_id()->MergeFrom(info.id());
+          send(executor->pid, message);
+        } else {
+          LOG(INFO) << "Unable to reconnect to executor '" << executor->id
+                    << "' of framework " << framework->id
+                    << " because no libprocess PID was found";
+        }
+      } else {
+        if (executor->pid) {
+          // Cleanup executors.
+          LOG(INFO) << "Sending shutdown to executor '" << executor->id
+                    << "' of framework " << framework->id
+                    << " to " << executor->pid;
+
+          shutdownExecutor(framework, executor);
+        } else {
+          LOG(INFO) << "Killing executor '" << executor->id
+                    << "' of framework " << framework->id
+                    << " because no libprocess PID was found";
+
+          dispatch(
+              isolator, &Isolator::killExecutor, framework->id, executor->id);
+        }
+      }
+    }
   }
 
   if (reconnect) {
@@ -2612,8 +2656,10 @@ Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
 }
 
 
-void Slave::recoverFramework(const FrameworkState& state, bool reconnect)
+void Slave::recoverFramework(const FrameworkState& state)
 {
+  LOG(INFO) << "Recovering framework " << state.id;
+
   if (state.executors.empty()) {
     // GC the framework work directory.
     gc.schedule(flags.gc_delay,
@@ -2646,46 +2692,6 @@ void Slave::recoverFramework(const FrameworkState& state, bool reconnect)
                    &Self::fileAttached,
                    params::_1,
                    executor->directory));
-
-    // And monitor the executor.
-    monitor.watch(
-        framework->id,
-        executor->id,
-        executor->info,
-        flags.resource_monitoring_interval)
-      .onAny(lambda::bind(_watch, lambda::_1, framework->id, executor->id));
-
-    if (reconnect) {
-      if (executor->pid) {
-        LOG(INFO) << "Sending reconnect request to executor " << executor->id
-                  << " of framework " << framework->id
-                  << " at " << executor->pid;
-
-        ReconnectExecutorMessage message;
-        message.mutable_slave_id()->MergeFrom(info.id());
-        send(executor->pid, message);
-      } else {
-        LOG(INFO) << "Unable to reconnect to executor '" << executor->id
-                  << "' of framework " << framework->id
-                  << " because no libprocess PID was found";
-      }
-    } else {
-      if (executor->pid) {
-        // Cleanup executors.
-        LOG(INFO) << "Sending shutdown to executor '" << executor->id
-                  << "' of framework " << framework->id
-                  << " to " << executor->pid;
-
-        shutdownExecutor(framework, executor);
-      } else {
-        LOG(INFO) << "Killing executor '" << executor->id
-                  << "' of framework " << framework->id
-                  << " because no libprocess PID was found";
-
-        dispatch(
-            isolator, &Isolator::killExecutor, framework->id, executor->id);
-      }
-    }
   }
 
   // Remove the framework in case we didn't recover any executors.
@@ -2870,8 +2876,6 @@ Executor* Framework::recoverExecutor(const ExecutorState& state)
     }
   }
 
-  CHECK_NOTNULL(slave);
-
   // Create executor.
   const string& directory = paths::getExecutorRunPath(
       slave->flags.work_dir, slave->info.id(), id, state.id, uuid);
@@ -2889,7 +2893,10 @@ Executor* Framework::recoverExecutor(const ExecutorState& state)
     // libprocess pid. So, it is not possible for the libprocess pid
     // to exist but not the forked pid. If so, it is a really bad
     // situation (e.g., disk corruption).
-    CHECK_SOME(run.forkedPid);
+    CHECK_SOME(run.forkedPid)
+      << "Failed to get forked pid for executor " << state.id
+      << " of framework " << id;
+
     executor->pid = run.libprocessPid.get();
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/83d5c257/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index af94f3c..8ba605b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -243,7 +243,7 @@ protected:
   Future<Nothing> _recover(const state::SlaveState& state, bool reconnect);
 
   // Helper to recover a framework from the specified state.
-  void recoverFramework(const state::FrameworkState& state, bool reconnect);
+  void recoverFramework(const state::FrameworkState& state);
 
   // Removes and garbage collects the executor.
   void removeExecutor(Framework* framework, Executor* executor);