You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/04 23:42:38 UTC

[5/5] mesos git commit: Added tests for recovery for HTTP based executors.

Added tests for recovery for HTTP based executors.

Review: https://reviews.apache.org/r/42186/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2dd8dc85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2dd8dc85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2dd8dc85

Branch: refs/heads/master
Commit: 2dd8dc85a1443b2192c548cabeab41aad6da3e17
Parents: 8bb8667
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:42 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:42 2016 -0800

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 228 ++++++++++++++++++++++++++++++++
 1 file changed, 228 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2dd8dc85/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6683a08..bccdf37 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -21,7 +21,10 @@
 
 #include <gtest/gtest.h>
 
+#include <mesos/v1/executor/executor.hpp>
+
 #include <mesos/executor.hpp>
+#include <mesos/http.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
@@ -58,6 +61,7 @@
 
 #include "tests/allocator.hpp"
 #include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
@@ -69,6 +73,8 @@ using google::protobuf::RepeatedPtrField;
 
 using mesos::internal::master::Master;
 
+using mesos::v1::executor::Call;
+
 using std::map;
 using std::string;
 using std::vector;
@@ -411,6 +417,111 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
 
 
 // The slave is stopped before the first update for a task is received
+// from the HTTP based executor. When it comes back up with recovery=reconnect,
+// make sure the executor subscribes and the slave properly sends the update.
+TYPED_TEST(SlaveRecoveryTest, ReconnectHTTPExecutor)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer1);
+
+  // Start the slave with a static process ID. This allows the executor to
+  // reconnect with the slave upon a process restart.
+  const std::string id("slave");
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+  // resolved.
+  ExecutorInfo executorInfo =
+    CREATE_EXECUTOR_INFO(
+        "http",
+        path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<v1::executor::Call> updateCall1 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::executor::Call> updateCall2 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  // Stop the slave before the status updates are received.
+  AWAIT_READY(updateCall1);
+  AWAIT_READY(updateCall2);
+
+  this->Stop(slave.get());
+  delete containerizer1.get();
+
+  Future<v1::executor::Call> subscribeCall =
+    FUTURE_HTTP_CALL(Call(), Call::SUBSCRIBE, _, ContentType::PROTOBUF);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  // Restart the slave (use same flags) with a new containerizer.
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer2);
+
+  slave = this->StartSlave(containerizer2.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the executor subscribes again.
+  AWAIT_READY(subscribeCall);
+
+  ASSERT_EQ(2, subscribeCall.get().subscribe().unacknowledged_updates().size());
+  ASSERT_EQ(1, subscribeCall.get().subscribe().unacknowledged_tasks().size());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+  delete containerizer2.get();
+}
+
+
+// The slave is stopped before the first update for a task is received
 // from the executor. When it comes back up with recovery=reconnect, make
 // sure the executor re-registers and the slave properly sends the update.
 TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
@@ -925,6 +1036,123 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
 }
 
 
+// The slave is stopped before a terminal update is received from the HTTP
+// based executor. The slave is then restarted in recovery=cleanup mode.
+// It kills the executor, and terminates. Master should then send TASK_LOST.
+TYPED_TEST(SlaveRecoveryTest, CleanupHTTPExecutor)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer1);
+
+  // Start the slave with a static process ID. This allows the executor to
+  // reconnect with the slave upon a process restart.
+  const std::string id("slave");
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+  // resolved.
+  ExecutorInfo executorInfo =
+    CREATE_EXECUTOR_INFO(
+        "http",
+        path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<v1::executor::Call> updateCall1 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::executor::Call> updateCall2 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  // Stop the slave before the status updates are received.
+  AWAIT_READY(updateCall1);
+  AWAIT_READY(updateCall2);
+
+  this->Stop(slave.get());
+  delete containerizer1.get();
+
+  // Slave in cleanup mode shouldn't re-register with the master and
+  // hence no offers should be made by the master.
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .Times(0);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+  EXPECT_CALL(sched, slaveLost(_, _))
+    .Times(AtMost(1));
+
+  // Restart the slave in 'cleanup' recovery mode with a new isolator.
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer2);
+
+  flags.recover = "cleanup";
+  slave = this->StartSlave(containerizer2.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the executor.
+  while (status.isPending()) {
+    Clock::advance(process::MAX_REAP_INTERVAL());
+    Clock::settle();
+  }
+
+  // Wait for recovery to complete.
+  AWAIT_READY(__recover);
+
+  // Scheduler should receive the TASK_LOST update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_LOST, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+  delete containerizer2.get();
+}
+
+
 // The slave is stopped after a non-terminal update is received.
 // Slave is restarted in recovery=cleanup mode. It kills the command
 // executor, and terminates. Master should then send TASK_LOST.