You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/12/05 00:27:29 UTC

[1/3] mesos git commit: Added support for recovering RunState for HTTP based executors.

Repository: mesos
Updated Branches:
  refs/heads/master 9000e9190 -> 517e84669


Added support for recovering RunState for HTTP based executors.

This change adds support for recovering the `RunState` for `HTTP` based
executors. Upon agent recovery, it checks if the marker file for HTTP
exists and populates `RunState.http` based on that. This is later used
by the Agent for recovering `HTTP` based executors.

Review: https://reviews.apache.org/r/39297


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ebff7ce0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ebff7ce0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ebff7ce0

Branch: refs/heads/master
Commit: ebff7ce003f08f2d92af7e28e843f7844f065cab
Parents: 9000e91
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:24:29 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:24:30 2015 -0800

----------------------------------------------------------------------
 src/slave/state.cpp | 56 ++++++++++++++++++++++++++++--------------------
 src/slave/state.hpp |  4 ++++
 2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ebff7ce0/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index bc46cc6..9e470cb 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -500,37 +500,47 @@ Try<RunState> RunState::recover(
   path = paths::getLibprocessPidPath(
       rootDir, slaveId, frameworkId, executorId, containerId);
 
-  if (!os::exists(path)) {
-    // This could happen if the slave died before the executor
-    // registered with the slave.
-    LOG(WARNING)
-      << "Failed to find executor libprocess pid file '" << path << "'";
-    return state;
-  }
-
-  pid = os::read(path);
-
-  if (pid.isError()) {
-    message = "Failed to read executor libprocess pid from '" + path +
-              "': " + pid.error();
+  if (os::exists(path)) {
+    pid = os::read(path);
+
+    if (pid.isError()) {
+      message = "Failed to read executor libprocess pid from '" + path +
+                "': " + pid.error();
+
+      if (strict) {
+        return Error(message);
+      } else {
+        LOG(WARNING) << message;
+        state.errors++;
+        return state;
+      }
+    }
 
-    if (strict) {
-      return Error(message);
-    } else {
-      LOG(WARNING) << message;
-      state.errors++;
+    if (pid.get().empty()) {
+      // This could happen if the slave died after opening the file for
+      // writing but before it checkpointed anything.
+      LOG(WARNING) << "Found empty executor libprocess pid file '" << path
+                   << "'";
       return state;
     }
+
+    state.libprocessPid = process::UPID(pid.get());
+    state.http = false;
+
+    return state;
   }
 
-  if (pid.get().empty()) {
-    // This could happen if the slave died after opening the file for
-    // writing but before it checkpointed anything.
-    LOG(WARNING) << "Found empty executor libprocess pid file '" << path << "'";
+  path = paths::getExecutorHttpMarkerPath(
+      rootDir, slaveId, frameworkId, executorId, containerId);
+
+  if (!os::exists(path)) {
+    // This could happen if the slave died before the executor
+    // registered with the slave.
+    LOG(WARNING) << "Failed to find executor libprocess pid/http marker file";
     return state;
   }
 
-  state.libprocessPid = process::UPID(pid.get());
+  state.http = true;
 
   return state;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ebff7ce0/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index eb6b06c..d4235d5 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -207,6 +207,10 @@ struct RunState
   Option<pid_t> forkedPid;
   Option<process::UPID> libprocessPid;
 
+  // This represents if the executor is connected via HTTP. It can be None()
+  // when the connection type is unknown.
+  Option<bool> http;
+
   // Executor terminated and all its updates acknowledged.
   bool completed;
 


[2/3] mesos git commit: Added functionality for Subscribe/Subscribed workflow for HTTP executors.

Posted by vi...@apache.org.
Added functionality for Subscribe/Subscribed workflow for HTTP
executors.

This change adds the functionality for executors to `Subscribe` via the
`api/v1/executor` endpoint. It also stores a marker file as part of the
`Subscribe` call if framework `checkpointing` is enabled. This can then
be used by the agent when recovering to wait for reconnecting back with
the executor.

Review: https://reviews.apache.org/r/38877


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2c3faa8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2c3faa8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2c3faa8

Branch: refs/heads/master
Commit: a2c3faa8a08f60381c8b11c40fc51d1c877e749b
Parents: ebff7ce
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:25:11 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:25:13 2015 -0800

----------------------------------------------------------------------
 src/slave/http.cpp  |   3 +
 src/slave/slave.cpp | 206 ++++++++++++++++++++++++++++++++++++++++++++---
 src/slave/slave.hpp |   7 ++
 3 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index eeebc79..cef568d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -308,6 +308,9 @@ Future<Response> Slave::Http::executor(const Request& request) const
       ok.type = Response::PIPE;
       ok.reader = pipe.reader();
 
+      HttpConnection http {pipe.writer(), responseContentType};
+      slave->subscribe(http, call.subscribe(), framework, executor);
+
       return ok;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a33187e..9bd86e1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+using mesos::executor::Call;
+
 using mesos::slave::QoSController;
 using mesos::slave::QoSCorrection;
 using mesos::slave::ResourceEstimator;
@@ -2398,6 +2400,176 @@ void Slave::_statusUpdateAcknowledgement(
 }
 
 
+void Slave::subscribe(
+    HttpConnection http,
+    const Call::Subscribe& subscribe,
+    Framework* framework,
+    Executor* executor)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(executor);
+
+  LOG(INFO) << "Received Subscribe request for HTTP executor " << *executor;
+
+  CHECK(state == DISCONNECTED || state == RUNNING ||
+        state == TERMINATING) << state;
+
+  if (state == TERMINATING) {
+    LOG(WARNING) << "Shutting down executor " << *executor << " as the slave "
+                 << "is terminating";
+    http.send(ShutdownExecutorMessage());
+    http.close();
+    return;
+  }
+
+  CHECK(framework->state == Framework::RUNNING ||
+        framework->state == Framework::TERMINATING)
+    << framework->state;
+
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Shutting down executor " << *executor << " as the "
+                 << "framework is terminating";
+    http.send(ShutdownExecutorMessage());
+    http.close();
+    return;
+  }
+
+  switch (executor->state) {
+    case Executor::TERMINATING:
+    case Executor::TERMINATED:
+      // TERMINATED is possible if the executor forks, the parent process
+      // terminates and the child process (driver) tries to register!
+      LOG(WARNING) << "Shutting down executor " << *executor
+                   << " because it is in unexpected state " << executor->state;
+      http.send(ShutdownExecutorMessage());
+      http.close();
+      break;
+    case Executor::RUNNING:
+    case Executor::REGISTERING: {
+      // Close the earlier connection if one existed. This can even
+      // be a retried Subscribe request from an already connected
+      // executor.
+      if (executor->http.isSome()) {
+        LOG(WARNING) << "Closing already existing HTTP connection from "
+                     << "executor " << *executor;
+        executor->http->close();
+      }
+
+      executor->state = Executor::RUNNING;
+
+      // Save the connection for the executor.
+      executor->http = http;
+      executor->pid = None();
+
+      if (framework->info.checkpoint()) {
+        // Write a marker file to indicate that this executor
+        // is HTTP based.
+        const string path = paths::getExecutorHttpMarkerPath(
+            metaDir,
+            info.id(),
+            framework->id(),
+            executor->id,
+            executor->containerId);
+
+        LOG(INFO) << "Creating a marker file for HTTP based executor "
+                  << *executor << " at path '" << path << "'";
+        CHECK_SOME(os::touch(path));
+      }
+
+      // Tell executor it's registered and give it any queued tasks.
+      ExecutorRegisteredMessage message;
+      message.mutable_executor_info()->MergeFrom(executor->info);
+      message.mutable_framework_id()->MergeFrom(framework->id());
+      message.mutable_framework_info()->MergeFrom(framework->info);
+      message.mutable_slave_id()->MergeFrom(info.id());
+      message.mutable_slave_info()->MergeFrom(info);
+      executor->send(message);
+
+      // Handle all the pending updates.
+      // The status update manager might have already checkpointed some
+      // of these pending updates (for example, if the slave died right
+      // after it checkpointed the update but before it could send the
+      // ACK to the executor). This is ok because the status update
+      // manager correctly handles duplicate updates.
+      foreach (const Call::Update& update, subscribe.updates()) {
+        // NOTE: This also updates the executor's resources!
+        statusUpdate(protobuf::createStatusUpdate(
+            framework->id(),
+            update.status(),
+            info.id()),
+            None());
+      }
+
+      // Update the resource limits for the container. Note that the
+      // resource limits include the currently queued tasks because we
+      // want the container to have enough resources to hold the
+      // upcoming tasks.
+      Resources resources = executor->resources;
+
+      // TODO(jieyu): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (const TaskInfo& task, executor->queuedTasks.values()) {
+        resources += task.resources();
+      }
+
+      containerizer->update(executor->containerId, resources)
+        .onAny(defer(self(),
+                     &Self::runTasks,
+                     lambda::_1,
+                     framework->id(),
+                     executor->id,
+                     executor->containerId,
+                     executor->queuedTasks.values()));
+
+      hashmap<TaskID, TaskInfo> unackedTasks;
+      foreach (const TaskInfo& task, subscribe.tasks()) {
+        unackedTasks[task.task_id()] = task;
+      }
+
+      // Now, if there is any task still in STAGING state and not in
+      // unacknowledged 'tasks' known to the executor, the slave must
+      // have died before the executor received the task! We should
+      // transition it to TASK_LOST. We only consider/store
+      // unacknowledged 'tasks' at the executor driver because if a
+      // task has been acknowledged, the slave must have received
+      // an update for that task and transitioned it out of STAGING!
+      // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
+      // 'Task' so that we can relaunch such tasks! Currently we
+      // don't do it because 'TaskInfo.data' could be huge.
+      // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (Task* task, executor->launchedTasks.values()) {
+        if (task->state() == TASK_STAGING &&
+            !unackedTasks.contains(task->task_id())) {
+          LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+                    << " to LOST because it is unknown to the executor "
+                    << executor->id;
+
+          const StatusUpdate update = protobuf::createStatusUpdate(
+              framework->id(),
+              info.id(),
+              task->task_id(),
+              TASK_LOST,
+              TaskStatus::SOURCE_SLAVE,
+              UUID::random(),
+              "Task launched during slave restart",
+              TaskStatus::REASON_SLAVE_RESTARTED,
+              executor->id);
+
+          statusUpdate(update, UPID());
+        }
+      }
+
+      break;
+    }
+    default:
+      LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
+                 << executor->state;
+      break;
+  }
+}
+
+
 void Slave::registerExecutor(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -5206,18 +5378,28 @@ void Framework::recoverExecutor(const ExecutorState& state)
   Executor* executor = new Executor(
       slave, id(), state.info.get(), latest, directory, info.checkpoint());
 
-  // Recover the libprocess PID if possible.
-  if (run.get().libprocessPid.isSome()) {
-    // When recovering in non-strict mode, the assumption is that the
-    // slave can die after checkpointing the forked pid but before the
-    // libprocess pid. So, it is not possible for the libprocess pid
-    // to exist but not the forked pid. If so, it is a really bad
-    // situation (e.g., disk corruption).
-    CHECK_SOME(run.get().forkedPid)
-      << "Failed to get forked pid for executor " << state.id
-      << " of framework " << id();
-
-    executor->pid = run.get().libprocessPid.get();
+  // Recover the libprocess PID if possible for PID based executors.
+  if (run.get().http.isSome()) {
+    if (!run.get().http.get()) {
+      // When recovering in non-strict mode, the assumption is that the
+      // slave can die after checkpointing the forked pid but before the
+      // libprocess pid. So, it is not possible for the libprocess pid
+      // to exist but not the forked pid. If so, it is a really bad
+      // situation (e.g., disk corruption).
+      CHECK_SOME(run.get().forkedPid)
+        << "Failed to get forked pid for executor " << state.id
+        << " of framework " << id();
+
+      executor->pid = run.get().libprocessPid.get();
+    } else {
+      // We set the PID to None() to signify that this is a HTTP based
+      // executor.
+      executor->pid = None();
+    }
+  } else {
+    // We set the PID to UPID() to signify that the connection type for this
+    // executor is unknown.
+    executor->pid = UPID();
   }
 
   // And finally recover all the executor's tasks.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2c3faa8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5ee133a..b7586ce 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -88,6 +88,7 @@ using namespace process;
 class StatusUpdateManager;
 struct Executor;
 struct Framework;
+struct HttpConnection;
 
 class Slave : public ProtobufProcess<Slave>
 {
@@ -161,6 +162,12 @@ public:
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
+  void subscribe(
+    HttpConnection http,
+    const executor::Call::Subscribe& subscribe,
+    Framework* framework,
+    Executor* executor);
+
   void registerExecutor(
       const process::UPID& from,
       const FrameworkID& frameworkId,


[3/3] mesos git commit: Added test for the Subscribe->Subscribed workflow for the Executor HTTP API.

Posted by vi...@apache.org.
Added test for the Subscribe->Subscribed workflow for the Executor
HTTP API.

This change adds a basic test to validate the implementation for
Subscribe->Subscribed workflow on the `api/v1/executor` endpoint on
Agent.

Review: https://reviews.apache.org/r/38878


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/517e8466
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/517e8466
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/517e8466

Branch: refs/heads/master
Commit: 517e84669790f833f6cf0e7911968a3591c93419
Parents: a2c3faa
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:26:10 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:26:11 2015 -0800

----------------------------------------------------------------------
 src/tests/executor_http_api_tests.cpp | 108 +++++++++++++++++++++++++++++
 1 file changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/517e8466/src/tests/executor_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp
index fe9df1f..1be657c 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -25,34 +25,45 @@
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
+#include <process/message.hpp>
 #include <process/pid.hpp>
 
 #include "common/http.hpp"
+#include "common/recordio.hpp"
 
 #include "master/master.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 
 using mesos::internal::master::Master;
 
+using mesos::internal::recordio::Reader;
+
 using mesos::internal::slave::Slave;
 
 using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::PID;
 
 using process::http::BadRequest;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::OK;
+using process::http::Pipe;
 using process::http::Response;
 using process::http::UnsupportedMediaType;
 
+using recordio::Decoder;
+
 using std::string;
 using std::vector;
 
+using testing::Eq;
 using testing::WithParamInterface;
 
 namespace mesos {
@@ -694,6 +705,103 @@ TEST_P(ExecutorHttpApiTest, StatusUpdateCallFailedValidation)
 }
 
 
+// This test verifies if the executor is able to receive a Subscribed
+// event in response to a Subscribe call request.
+TEST_P(ExecutorHttpApiTest, Subscribe)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers.get().size());
+
+  Future<Message> registerExecutorMessage =
+    DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
+  driver.launchTasks(offers.get()[0].id(), {taskInfo});
+
+  // Drop the `RegisterExecutorMessage` and then send a `Subscribe` request
+  // from the HTTP based executor.
+  AWAIT_READY(registerExecutorMessage);
+
+  Call call;
+  call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+  call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+  call.set_type(Call::SUBSCRIBE);
+
+  call.mutable_subscribe();
+
+  // Retrieve the parameter passed as content type to this test.
+  const ContentType contentType = GetParam();
+
+  process::http::Headers headers;
+  headers["Accept"] = stringify(contentType);
+
+  Future<Response> response = process::http::streaming::post(
+      slave.get(),
+      "api/v1/executor",
+      headers,
+      serialize(contentType, call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ(stringify(contentType),
+                 response.get().headers.get("Content-Type"));
+
+  EXPECT_SOME_EQ("chunked",
+                 response.get().headers.get("Transfer-Encoding"));
+
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+  Reader<Event> responseDecoder(
+      Decoder<Event>(deserializer),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and if the ExecutorID matches.
+  ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+  ASSERT_EQ(event.get().get().subscribed().executor_info().executor_id(),
+            call.executor_id());
+
+  reader.get().close();
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {