You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/12/05 00:27:30 UTC

[2/3] mesos git commit: Added functionality for Subscribe/Subscribed workflow for HTTP executors.

Added functionality for Subscribe/Subscribed workflow for HTTP
executors.

This change adds the functionality for executors to `Subscribe` via the
`api/v1/executor` endpoint. It also stores a marker file as part of the
`Subscribe` call if framework `checkpointing` is enabled. This can then
be used by the agent when recovering to wait for reconnecting back with
the executor.

Review: https://reviews.apache.org/r/38877


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2c3faa8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2c3faa8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2c3faa8

Branch: refs/heads/master
Commit: a2c3faa8a08f60381c8b11c40fc51d1c877e749b
Parents: ebff7ce
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:25:11 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:25:13 2015 -0800

----------------------------------------------------------------------
 src/slave/http.cpp  |   3 +
 src/slave/slave.cpp | 206 ++++++++++++++++++++++++++++++++++++++++++++---
 src/slave/slave.hpp |   7 ++
 3 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index eeebc79..cef568d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -308,6 +308,9 @@ Future<Response> Slave::Http::executor(const Request& request) const
       ok.type = Response::PIPE;
       ok.reader = pipe.reader();
 
+      HttpConnection http {pipe.writer(), responseContentType};
+      slave->subscribe(http, call.subscribe(), framework, executor);
+
       return ok;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a33187e..9bd86e1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+using mesos::executor::Call;
+
 using mesos::slave::QoSController;
 using mesos::slave::QoSCorrection;
 using mesos::slave::ResourceEstimator;
@@ -2398,6 +2400,176 @@ void Slave::_statusUpdateAcknowledgement(
 }
 
 
+void Slave::subscribe(
+    HttpConnection http,
+    const Call::Subscribe& subscribe,
+    Framework* framework,
+    Executor* executor)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(executor);
+
+  LOG(INFO) << "Received Subscribe request for HTTP executor " << *executor;
+
+  CHECK(state == DISCONNECTED || state == RUNNING ||
+        state == TERMINATING) << state;
+
+  if (state == TERMINATING) {
+    LOG(WARNING) << "Shutting down executor " << *executor << " as the slave "
+                 << "is terminating";
+    http.send(ShutdownExecutorMessage());
+    http.close();
+    return;
+  }
+
+  CHECK(framework->state == Framework::RUNNING ||
+        framework->state == Framework::TERMINATING)
+    << framework->state;
+
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Shutting down executor " << *executor << " as the "
+                 << "framework is terminating";
+    http.send(ShutdownExecutorMessage());
+    http.close();
+    return;
+  }
+
+  switch (executor->state) {
+    case Executor::TERMINATING:
+    case Executor::TERMINATED:
+      // TERMINATED is possible if the executor forks, the parent process
+      // terminates and the child process (driver) tries to register!
+      LOG(WARNING) << "Shutting down executor " << *executor
+                   << " because it is in unexpected state " << executor->state;
+      http.send(ShutdownExecutorMessage());
+      http.close();
+      break;
+    case Executor::RUNNING:
+    case Executor::REGISTERING: {
+      // Close the earlier connection if one existed. This can even
+      // be a retried Subscribe request from an already connected
+      // executor.
+      if (executor->http.isSome()) {
+        LOG(WARNING) << "Closing already existing HTTP connection from "
+                     << "executor " << *executor;
+        executor->http->close();
+      }
+
+      executor->state = Executor::RUNNING;
+
+      // Save the connection for the executor.
+      executor->http = http;
+      executor->pid = None();
+
+      if (framework->info.checkpoint()) {
+        // Write a marker file to indicate that this executor
+        // is HTTP based.
+        const string path = paths::getExecutorHttpMarkerPath(
+            metaDir,
+            info.id(),
+            framework->id(),
+            executor->id,
+            executor->containerId);
+
+        LOG(INFO) << "Creating a marker file for HTTP based executor "
+                  << *executor << " at path '" << path << "'";
+        CHECK_SOME(os::touch(path));
+      }
+
+      // Tell executor it's registered and give it any queued tasks.
+      ExecutorRegisteredMessage message;
+      message.mutable_executor_info()->MergeFrom(executor->info);
+      message.mutable_framework_id()->MergeFrom(framework->id());
+      message.mutable_framework_info()->MergeFrom(framework->info);
+      message.mutable_slave_id()->MergeFrom(info.id());
+      message.mutable_slave_info()->MergeFrom(info);
+      executor->send(message);
+
+      // Handle all the pending updates.
+      // The status update manager might have already checkpointed some
+      // of these pending updates (for example, if the slave died right
+      // after it checkpointed the update but before it could send the
+      // ACK to the executor). This is ok because the status update
+      // manager correctly handles duplicate updates.
+      foreach (const Call::Update& update, subscribe.updates()) {
+        // NOTE: This also updates the executor's resources!
+        statusUpdate(protobuf::createStatusUpdate(
+            framework->id(),
+            update.status(),
+            info.id()),
+            None());
+      }
+
+      // Update the resource limits for the container. Note that the
+      // resource limits include the currently queued tasks because we
+      // want the container to have enough resources to hold the
+      // upcoming tasks.
+      Resources resources = executor->resources;
+
+      // TODO(jieyu): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (const TaskInfo& task, executor->queuedTasks.values()) {
+        resources += task.resources();
+      }
+
+      containerizer->update(executor->containerId, resources)
+        .onAny(defer(self(),
+                     &Self::runTasks,
+                     lambda::_1,
+                     framework->id(),
+                     executor->id,
+                     executor->containerId,
+                     executor->queuedTasks.values()));
+
+      hashmap<TaskID, TaskInfo> unackedTasks;
+      foreach (const TaskInfo& task, subscribe.tasks()) {
+        unackedTasks[task.task_id()] = task;
+      }
+
+      // Now, if there is any task still in STAGING state and not in
+      // unacknowledged 'tasks' known to the executor, the slave must
+      // have died before the executor received the task! We should
+      // transition it to TASK_LOST. We only consider/store
+      // unacknowledged 'tasks' at the executor driver because if a
+      // task has been acknowledged, the slave must have received
+      // an update for that task and transitioned it out of STAGING!
+      // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
+      // 'Task' so that we can relaunch such tasks! Currently we
+      // don't do it because 'TaskInfo.data' could be huge.
+      // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (Task* task, executor->launchedTasks.values()) {
+        if (task->state() == TASK_STAGING &&
+            !unackedTasks.contains(task->task_id())) {
+          LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+                    << " to LOST because it is unknown to the executor "
+                    << executor->id;
+
+          const StatusUpdate update = protobuf::createStatusUpdate(
+              framework->id(),
+              info.id(),
+              task->task_id(),
+              TASK_LOST,
+              TaskStatus::SOURCE_SLAVE,
+              UUID::random(),
+              "Task launched during slave restart",
+              TaskStatus::REASON_SLAVE_RESTARTED,
+              executor->id);
+
+          statusUpdate(update, UPID());
+        }
+      }
+
+      break;
+    }
+    default:
+      LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
+                 << executor->state;
+      break;
+  }
+}
+
+
 void Slave::registerExecutor(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -5206,18 +5378,28 @@ void Framework::recoverExecutor(const ExecutorState& state)
   Executor* executor = new Executor(
       slave, id(), state.info.get(), latest, directory, info.checkpoint());
 
-  // Recover the libprocess PID if possible.
-  if (run.get().libprocessPid.isSome()) {
-    // When recovering in non-strict mode, the assumption is that the
-    // slave can die after checkpointing the forked pid but before the
-    // libprocess pid. So, it is not possible for the libprocess pid
-    // to exist but not the forked pid. If so, it is a really bad
-    // situation (e.g., disk corruption).
-    CHECK_SOME(run.get().forkedPid)
-      << "Failed to get forked pid for executor " << state.id
-      << " of framework " << id();
-
-    executor->pid = run.get().libprocessPid.get();
+  // Recover the libprocess PID if possible for PID based executors.
+  if (run.get().http.isSome()) {
+    if (!run.get().http.get()) {
+      // When recovering in non-strict mode, the assumption is that the
+      // slave can die after checkpointing the forked pid but before the
+      // libprocess pid. So, it is not possible for the libprocess pid
+      // to exist but not the forked pid. If so, it is a really bad
+      // situation (e.g., disk corruption).
+      CHECK_SOME(run.get().forkedPid)
+        << "Failed to get forked pid for executor " << state.id
+        << " of framework " << id();
+
+      executor->pid = run.get().libprocessPid.get();
+    } else {
+      // We set the PID to None() to signify that this is a HTTP based
+      // executor.
+      executor->pid = None();
+    }
+  } else {
+    // We set the PID to UPID() to signify that the connection type for this
+    // executor is unknown.
+    executor->pid = UPID();
   }
 
   // And finally recover all the executor's tasks.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5ee133a..b7586ce 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -88,6 +88,7 @@ using namespace process;
 class StatusUpdateManager;
 struct Executor;
 struct Framework;
+struct HttpConnection;
 
 class Slave : public ProtobufProcess<Slave>
 {
@@ -161,6 +162,12 @@ public:
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
+  void subscribe(
+    HttpConnection http,
+    const executor::Call::Subscribe& subscribe,
+    Framework* framework,
+    Executor* executor);
+
   void registerExecutor(
       const process::UPID& from,
       const FrameworkID& frameworkId,