You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/13 07:24:00 UTC
svn commit: r1455817 [2/2] - in /incubator/mesos/trunk/src: exec/ launcher/
messages/ slave/ tests/
Modified: incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp?rev=1455817&r1=1455816&r2=1455817&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp Wed Mar 13 06:23:59 2013
@@ -32,14 +32,18 @@
#include <stout/option.hpp>
#include <stout/path.hpp>
+#include "common/process_utils.hpp"
#include "common/protobuf_utils.hpp"
#include "detector/detector.hpp"
+#include "linux/cgroups.hpp"
+
#include "master/allocator.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
+#include "slave/cgroups_isolation_module.hpp"
#include "slave/paths.hpp"
#include "slave/process_based_isolation_module.hpp"
#include "slave/reaper.hpp"
@@ -55,6 +59,7 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::slave;
using namespace mesos::internal::tests;
+using namespace mesos::internal::utils::process;
using namespace process;
@@ -62,6 +67,9 @@ using mesos::internal::master::Allocator
using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
+using mesos::internal::slave::CgroupsIsolationModule;
+using mesos::internal::slave::ProcessBasedIsolationModule;
+
using std::map;
using std::string;
using std::vector;
@@ -120,34 +128,29 @@ TEST_F(SlaveStateTest, CheckpointString)
}
-// TODO(vinod): Merge this with the fixture in status updates manager tests.
-class SlaveRecoveryTest : public ::testing::Test
+template <typename T>
+class SlaveRecoveryTest : public IsolationTest<T>
{
-protected:
+public:
static void SetUpTestCase()
{
+ IsolationTest<T>::SetUpTestCase();
+
// Enable checkpointing for the framework.
frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
-
- // Enable checkpointing on the slave.
- flags.checkpoint = true;
-
- // TODO(vinod): Do this for all the tests!
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
}
virtual void SetUp()
{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ IsolationTest<T>::SetUp();
- Try<string> workDir = os::mkdtemp();
- CHECK_SOME(workDir) << "Failed to mkdtemp";
- flags.work_dir = workDir.get();
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
// Always, drop the unregisterSlaveMessage sent by a slave when
- // its terminated. This will stop the master from removing the slave,
- // which is what we expect to happen in the real world when a slave exits.
+ // it is terminated. This will stop the master from removing the
+ // slave, which is what we expect to happen in the real world
+ // when a slave exits.
EXPECT_MESSAGE(Eq(UnregisterSlaveMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
@@ -155,39 +158,49 @@ protected:
m = new Master(a, &files);
master = process::spawn(m);
+ // Reset recovery slaveFlags.
+ this->slaveFlags.checkpoint = true;
+ this->slaveFlags.recover = "reconnect";
+ this->slaveFlags.safe = false;
+
startSlave();
}
virtual void TearDown()
{
- stopSlave();
+ // Wait for the executor to exit.
+ EXPECT_MESSAGE(Eq(UnregisterSlaveMessage().GetTypeName()), _, _)
+ .WillRepeatedly(Return(true));
+
+ stopSlave(true);
process::terminate(master);
process::wait(master);
delete m;
delete a;
- os::rmdir(flags.work_dir);
+ IsolationTest<T>::TearDown();
}
- void startSlave(const Option<string>& recover = None())
+protected:
+ void startSlave()
{
- if (recover.isSome()) {
- flags.recover = recover.get(); // Enable recovery.
- }
-
- isolationModule = new ProcessBasedIsolationModule();
- s = new Slave(flags, true, isolationModule, &files);
+ isolationModule = new T();
+ s = new Slave(this->slaveFlags, true, isolationModule, &files);
slave = process::spawn(s);
detector = new BasicMasterDetector(master, slave, true);
}
- void stopSlave()
+ void stopSlave(bool shutdown = false)
{
delete detector;
- process::terminate(slave);
+ if (shutdown) {
+ process::dispatch(slave, &Slave::shutdown);
+ } else {
+ process::terminate(slave);
+ }
process::wait(slave);
delete s;
@@ -197,7 +210,7 @@ protected:
HierarchicalDRFAllocatorProcess allocator;
Allocator *a;
Master* m;
- ProcessBasedIsolationModule* isolationModule;
+ IsolationModule* isolationModule;
Slave* s;
Files files;
BasicMasterDetector* detector;
@@ -206,16 +219,26 @@ protected:
PID<Master> master;
PID<Slave> slave;
static FrameworkInfo frameworkInfo;
- static flags::Flags<logging::Flags, slave::Flags> flags;
};
// Initialize static members here.
-FrameworkInfo SlaveRecoveryTest::frameworkInfo;
-flags::Flags<logging::Flags, slave::Flags> SlaveRecoveryTest::flags;
+template <typename T>
+FrameworkInfo SlaveRecoveryTest<T>::frameworkInfo;
+
+
+#ifdef __linux__
+typedef ::testing::Types<ProcessBasedIsolationModule, CgroupsIsolationModule>
+IsolationTypes;
+#else
+typedef ::testing::Types<ProcessBasedIsolationModule> IsolationTypes;
+#endif
-// Enable checkpointing on the slave and ensure SlaveState::recover works.
-TEST_F(SlaveRecoveryTest, RecoverSlaveState)
+TYPED_TEST_CASE(SlaveRecoveryTest, IsolationTypes);
+
+
+// Enable checkpointing on the slave and ensure recovery works.
+TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
{
// Message expectations.
process::Message message;
@@ -236,7 +259,7 @@ TEST_F(SlaveRecoveryTest, RecoverSlaveSt
process::Message message3;
trigger statusUpdateMsg;
- EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), Eq(master), _)
+ EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), Eq(this->master), _)
.WillOnce(DoAll(
SaveArgField<0>(&process::MessageEvent::message, &message3),
Trigger(&statusUpdateMsg),
@@ -252,20 +275,20 @@ TEST_F(SlaveRecoveryTest, RecoverSlaveSt
// Scheduler expectations.
FrameworkID frameworkId;
- EXPECT_CALL(sched, registered(_, _, _))
+ EXPECT_CALL(this->sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
trigger resourceOffersCall;
vector<Offer> offers;
- EXPECT_CALL(sched, resourceOffers(_, _))
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
Trigger(&resourceOffersCall)))
.WillRepeatedly(Return());
- EXPECT_CALL(sched, statusUpdate(_, _))
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
.WillRepeatedly(Return());
- MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
driver.start();
@@ -308,7 +331,7 @@ TEST_F(SlaveRecoveryTest, RecoverSlaveSt
// Recover the state.
Result<state::SlaveState> recover =
- state::recover(paths::getMetaRootDir(flags.work_dir), true);
+ state::recover(paths::getMetaRootDir(this->slaveFlags.work_dir), true);
ASSERT_SOME(recover);
@@ -384,18 +407,27 @@ TEST_F(SlaveRecoveryTest, RecoverSlaveSt
.tasks[task.task_id()]
.acks.contains(ack.uuid()));
+ // Shut down the executor.
+ process::post(libprocessPid, ShutdownExecutorMessage());
+
driver.stop();
driver.join();
}
-// A slave is started with checkpointing enabled (recovery disabled).
// The slave is killed before the ACK for a status update is received.
-// The slave is then restarted with recovery enabled.
-// Ensure that SUM is properly recovered and re-sends the un-acked update.
-TEST_F(SlaveRecoveryTest, RecoverStatusUpdateManager)
+// When the slave comes back up it resends the unacknowledged update.
+TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
{
// Message expectations.
+ process::Message message;
+ trigger registerExecutorMsg;
+ EXPECT_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(
+ SaveArgField<0>(&process::MessageEvent::message, &message),
+ Trigger(®isterExecutorMsg),
+ Return(false)));
+
trigger statusUpdateAckMsg;
EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
.WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
@@ -404,24 +436,24 @@ TEST_F(SlaveRecoveryTest, RecoverStatusU
// Scheduler expectations.
FrameworkID frameworkId;
- EXPECT_CALL(sched, registered(_, _, _))
+ EXPECT_CALL(this->sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
trigger resourceOffersCall;
vector<Offer> offers;
- EXPECT_CALL(sched, resourceOffers(_, _))
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
Trigger(&resourceOffersCall)))
.WillRepeatedly(Return());
TaskStatus status;
trigger statusUpdateCall;
- EXPECT_CALL(sched, statusUpdate(_, _))
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
.WillOnce(Return())
.WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
Trigger(&statusUpdateCall)));
- MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
driver.start();
@@ -434,19 +466,297 @@ TEST_F(SlaveRecoveryTest, RecoverStatusU
tasks.push_back(task); // Long-running task.
driver.launchTasks(offers[0].id(), tasks);
+ // Capture the executor pid.
+ WAIT_UNTIL(registerExecutorMsg);
+ UPID executorPid = message.from;
+
// Capture the ack.
WAIT_UNTIL(statusUpdateAckMsg);
- stopSlave();
+ this->stopSlave();
- // Restart the slave with recovery enabled.
- startSlave(Option<string>::some("reconnect"));
+ // Restart the slave.
+ this->startSlave();
WAIT_UNTIL(statusUpdateCall);
ASSERT_EQ(TASK_RUNNING, status.state());
+ // Shut down the executor.
+ process::post(executorPid, ShutdownExecutorMessage());
+
driver.stop();
driver.join();
}
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up with recovery=reconnect, make
+// sure the executor re-registers and the slave properly sends the update.
+TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
+{
+ // Message expectations.
+ trigger statusUpdateMsg;
+ EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(Trigger(&statusUpdateMsg),
+ Return(true))) // Drop the first update from the executor.
+ .WillRepeatedly(Return(false));
+
+ process::Message message;
+ trigger reregisterExecutorMessage;
+ EXPECT_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(
+ SaveArgField<0>(&process::MessageEvent::message, &message),
+ Trigger(&reregisterExecutorMessage),
+ Return(false)));
+
+ // Scheduler expectations.
+ FrameworkID frameworkId;
+ EXPECT_CALL(this->sched, registered(_, _, _));
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ TaskStatus status;
+ trigger statusUpdateCall;
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
+ Trigger(&statusUpdateCall)))
+ .WillRepeatedly(Return());
+
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+
+ TaskInfo task = createTask(offers[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+ driver.launchTasks(offers[0].id(), tasks);
+
+ // Stop the slave before the status update is received.
+ WAIT_UNTIL(statusUpdateMsg);
+ this->stopSlave();
+
+ // Restart the slave.
+ this->startSlave();
+
+ // Ensure the executor re-registers.
+ WAIT_UNTIL(reregisterExecutorMessage);
+ UPID executorPid = message.from;
+
+ ReregisterExecutorMessage reregister;
+ reregister.ParseFromString(message.body);
+
+ // Executor should inform about the unacknowledged update.
+ ASSERT_EQ(1, reregister.updates_size());
+ const StatusUpdate& update = reregister.updates(0);
+ ASSERT_EQ(task.task_id(), update.status().task_id());
+ ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+ // Scheduler should receive the recovered update.
+ WAIT_UNTIL(statusUpdateCall);
+ ASSERT_EQ(TASK_RUNNING, status.state());
+
+ // Shut down the executor.
+ process::post(executorPid, ShutdownExecutorMessage());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// The slave is stopped before the (command) executor is registered.
+// When it comes back up with recovery=reconnect, make sure the task is
+// properly transitioned to FAILED.
+TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
+{
+ // Message Expectations.
+ process::Message message;
+ trigger registerExecutorMsg;
+ EXPECT_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(
+ SaveArgField<0>(&process::MessageEvent::message, &message),
+ Trigger(®isterExecutorMsg),
+ Return(true))); // Drop the executor registration message.
+
+ // Scheduler expectations.
+ FrameworkID frameworkId;
+ EXPECT_CALL(this->sched, registered(_, _, _));
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ TaskStatus status;
+ trigger statusUpdateCall;
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
+ Trigger(&statusUpdateCall)))
+ .WillRepeatedly(Return());
+
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+
+ TaskInfo task = createTask(offers[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+ driver.launchTasks(offers[0].id(), tasks);
+
+ // Stop the slave before the executor is registered.
+ WAIT_UNTIL(registerExecutorMsg);
+ UPID executorPid = message.from;
+ this->stopSlave();
+
+ // Restart the slave.
+ this->startSlave();
+
+ // Scheduler should receive the TASK_FAILED update.
+ WAIT_UNTIL(statusUpdateCall);
+ ASSERT_EQ(TASK_FAILED, status.state());
+
+ // Shut down the executor.
+ process::post(executorPid, ShutdownExecutorMessage());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// The slave is stopped after a non-terminal update is received.
+// The command executor terminates when the slave is down.
+// When it comes back up with recovery=reconnect, make
+// sure the task is properly transitioned to FAILED.
+TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
+{
+ // Message Expectations.
+ process::Message message;
+ trigger registerExecutorMsg;
+ EXPECT_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(
+ SaveArgField<0>(&process::MessageEvent::message, &message),
+ Trigger(®isterExecutorMsg),
+ Return(false)));
+
+ // Scheduler expectations.
+ FrameworkID frameworkId;
+ EXPECT_CALL(this->sched, registered(_, _, _));
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ TaskStatus status;
+ trigger statusUpdateCall1, statusUpdateCall2;
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
+ .WillOnce(Trigger(&statusUpdateCall1))
+ .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
+ Trigger(&statusUpdateCall2)));
+
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+
+ TaskInfo task = createTask(offers[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+ driver.launchTasks(offers[0].id(), tasks);
+
+ // Capture the executor's pid.
+ WAIT_UNTIL(registerExecutorMsg);
+ UPID executorPid = message.from;
+
+ // Wait for TASK_RUNNING update.
+ WAIT_UNTIL(statusUpdateCall1);
+
+ sleep(1); // Give enough time for the ACK to be checkpointed.
+
+ this->stopSlave();
+
+ // Now shut down the executor, when the slave is down.
+ process::post(executorPid, ShutdownExecutorMessage());
+
+ // Restart the slave.
+ this->startSlave();
+
+ // Scheduler should receive the TASK_FAILED update.
+ WAIT_UNTIL(statusUpdateCall2);
+ ASSERT_EQ(TASK_FAILED, status.state());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// The slave is stopped after a non-terminal update is received.
+// Slave is restarted in recovery=cleanup mode. It kills the command executor,
+// and transitions the task to FAILED.
+TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
+{
+ // Scheduler expectations.
+ FrameworkID frameworkId;
+ EXPECT_CALL(this->sched, registered(_, _, _));
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(this->sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ TaskStatus status;
+ trigger statusUpdateCall1, statusUpdateCall2;
+ EXPECT_CALL(this->sched, statusUpdate(_, _))
+ .WillOnce(Trigger(&statusUpdateCall1))
+ .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
+ Trigger(&statusUpdateCall2)));
+
+ MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+
+ TaskInfo task = createTask(offers[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+ driver.launchTasks(offers[0].id(), tasks);
+
+ // Stop the slave before the executor is registered.
+ WAIT_UNTIL(statusUpdateCall1);
+
+ sleep(1); // Give enough time for the ACK to be checkpointed.
+
+ this->stopSlave();
+
+ // Restart the slave in 'cleanup' recovery mode.
+ this->slaveFlags.recover = "cleanup";
+ this->startSlave();
+
+ // Scheduler should receive the TASK_FAILED update.
+ WAIT_UNTIL(statusUpdateCall2);
+ ASSERT_EQ(TASK_FAILED, status.state());
+
+ driver.stop();
+ driver.join();
+}
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1455817&r1=1455816&r2=1455817&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Wed Mar 13 06:23:59 2013
@@ -37,11 +37,13 @@
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
+#include <stout/uuid.hpp>
#include "common/resources.hpp"
#include "common/type_utils.hpp"
@@ -65,6 +67,7 @@
#include "slave/isolation_module.hpp"
#include "slave/reaper.hpp"
#include "slave/slave.hpp"
+#include "slave/state.hpp"
#include "tests/flags.hpp"
@@ -770,25 +773,31 @@ public:
ResourceStatistics empty;
EXPECT_CALL(*this, usage(testing::_, testing::_))
.WillRepeatedly(Return(empty));
+
+ EXPECT_CALL(*this, recover(testing::_))
+ .WillRepeatedly(Return(Nothing()));
}
virtual ~TestingIsolationModule() {}
- virtual void initialize(const slave::Flags& flags,
- const Resources& resources,
- bool local,
- const process::PID<slave::Slave>& _slave)
+ virtual void initialize(
+ const slave::Flags& flags,
+ const Resources& resources,
+ bool local,
+ const process::PID<slave::Slave>& _slave)
{
slave = _slave;
}
- virtual void launchExecutor(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Resources& resources,
- bool checkpoint,
- const Option<std::string>& path)
+ virtual void launchExecutor(
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const UUID& uuid,
+ const std::string& directory,
+ const Resources& resources,
+ const Option<std::string>& path)
{
if (executors.count(executorInfo.executor_id()) > 0) {
Executor* executor = executors[executorInfo.executor_id()];
@@ -800,16 +809,20 @@ public:
os::setenv("MESOS_LOCAL", "1");
os::setenv("MESOS_DIRECTORY", directory);
os::setenv("MESOS_SLAVE_PID", slave);
+ os::setenv("MESOS_SLAVE_ID", slaveId.value());
os::setenv("MESOS_FRAMEWORK_ID", frameworkId.value());
os::setenv("MESOS_EXECUTOR_ID", executorInfo.executor_id().value());
+ os::setenv("MESOS_CHECKPOINT", frameworkInfo.checkpoint() ? "1" : "0");
driver->start();
os::unsetenv("MESOS_LOCAL");
os::unsetenv("MESOS_DIRECTORY");
os::unsetenv("MESOS_SLAVE_PID");
+ os::unsetenv("MESOS_SLAVE_ID");
os::unsetenv("MESOS_FRAMEWORK_ID");
os::unsetenv("MESOS_EXECUTOR_ID");
+ os::unsetenv("MESOS_CHECKPOINT");
process::dispatch(
slave,
@@ -823,8 +836,9 @@ public:
}
}
- virtual void killExecutor(const FrameworkID& frameworkId,
- const ExecutorID& executorId)
+ virtual void killExecutor(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
{
if (drivers.count(executorId) > 0) {
MesosExecutorDriver* driver = drivers[executorId];
@@ -857,6 +871,10 @@ public:
const FrameworkID&,
const ExecutorID&));
+ MOCK_METHOD1(
+ recover,
+ process::Future<Nothing>(const Option<slave::state::SlaveState>&));
+
std::map<ExecutorID, std::string> directories;
private: