You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/08/26 21:52:14 UTC
git commit: Added a recovery timeout for executor driver
self-termination.
Updated Branches:
refs/heads/master f00832a1d -> 5eb50d6e8
Added a recovery timeout for executor driver self-termination.
Review: https://reviews.apache.org/r/13791
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5eb50d6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5eb50d6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5eb50d6e
Branch: refs/heads/master
Commit: 5eb50d6e8da1f311de30ca5f1218bb5bcba2236c
Parents: f00832a
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 23 19:13:30 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Mon Aug 26 12:52:07 2013 -0700
----------------------------------------------------------------------
src/exec/exec.cpp | 64 +++++++++++++++++++++--
src/launcher/launcher.cpp | 10 +++-
src/launcher/launcher.hpp | 7 ++-
src/launcher/main.cpp | 26 ++++++++-
src/slave/cgroups_isolator.cpp | 3 +-
src/slave/constants.cpp | 1 +
src/slave/constants.hpp | 1 +
src/slave/flags.hpp | 9 ++++
src/slave/process_isolator.cpp | 3 +-
src/tests/slave_recovery_tests.cpp | 93 +++++++++++++++++++++++++++++++++
10 files changed, 206 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index ca61892..d370560 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -106,7 +106,8 @@ public:
const ExecutorID& _executorId,
bool _local,
const string& _directory,
- bool _checkpoint)
+ bool _checkpoint,
+ Duration _recoveryTimeout)
: ProcessBase(ID::generate("executor")),
slave(_slave),
driver(_driver),
@@ -115,10 +116,12 @@ public:
frameworkId(_frameworkId),
executorId(_executorId),
connected(false),
+ connection(UUID::random()),
local(_local),
aborted(false),
directory(_directory),
- checkpoint(_checkpoint)
+ checkpoint(_checkpoint),
+ recoveryTimeout(_recoveryTimeout)
{
install<ExecutorRegisteredMessage>(
&ExecutorProcess::registered,
@@ -195,6 +198,7 @@ protected:
VLOG(1) << "Executor registered on slave " << slaveId;
connected = true;
+ connection = UUID::random();
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
@@ -216,6 +220,9 @@ protected:
VLOG(1) << "Executor re-registered on slave " << slaveId;
+ connected = true;
+ connection = UUID::random();
+
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
@@ -391,6 +398,23 @@ protected:
aborted = true;
}
+ void _recoveryTimeout(UUID _connection)
+ {
+ // If we're connected, no need to shut down the driver!
+ if (connected) {
+ return;
+ }
+
+ // We need to compare the connections here to ensure there have
+ // not been any subsequent re-registrations with the slave in the
+ // interim.
+ if (connection == _connection) {
+ VLOG(1) << "Recovery timeout of " << recoveryTimeout << " exceeded; "
+ << "Shutting down";
+ shutdown();
+ }
+ }
+
virtual void exited(const UPID& pid)
{
if (aborted) {
@@ -402,13 +426,21 @@ protected:
// successfully registered with the slave, the slave can reconnect with
// this executor when it comes back up and performs recovery!
if (checkpoint && connected) {
+ connected = false;
+
VLOG(1) << "Slave exited, but framework has checkpointing enabled. "
- << "Waiting to reconnect with slave " << slaveId;
+ << "Waiting " << recoveryTimeout << " to reconnect with slave "
+ << slaveId;
+
+ delay(recoveryTimeout, self(), &Self::_recoveryTimeout, connection);
+
return;
}
VLOG(1) << "Slave exited ... shutting down";
+ connected = false;
+
if (!local) {
// Start the Shutdown Process.
spawn(new ShutdownProcess(), true);
@@ -494,10 +526,12 @@ private:
FrameworkID frameworkId;
ExecutorID executorId;
bool connected; // Registered with the slave.
+ UUID connection; // UUID to identify the connection instance.
bool local;
bool aborted;
const string directory;
bool checkpoint;
+ Duration recoveryTimeout;
LinkedHashMap<UUID, StatusUpdate> updates; // Unacknowledged updates.
@@ -587,7 +621,7 @@ Status MesosExecutorDriver::start()
value = os::getenv("MESOS_SLAVE_PID");
slave = UPID(value);
if (!slave) {
- fatal("cannot parse MESOS_SLAVE_PID");
+ fatal("Cannot parse MESOS_SLAVE_PID '%s'", value.c_str());
}
// Get slave ID from environment.
@@ -615,6 +649,25 @@ Status MesosExecutorDriver::start()
checkpoint = false;
}
+ Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+
+ // Get the recovery timeout if checkpointing is enabled.
+ if (checkpoint) {
+ value = os::getenv("MESOS_RECOVERY_TIMEOUT", false);
+
+ if (!value.empty()) {
+ Try<Duration> _recoveryTimeout = Duration::parse(value);
+
+ if (_recoveryTimeout.isError()) {
+ fatal("Cannot parse MESOS_RECOVERY_TIMEOUT '%s': %s",
+ value.c_str(),
+ _recoveryTimeout.error().c_str());
+ }
+
+ recoveryTimeout = _recoveryTimeout.get();
+ }
+ }
+
CHECK(process == NULL);
process = new ExecutorProcess(
@@ -626,7 +679,8 @@ Status MesosExecutorDriver::start()
executorId,
local,
workDirectory,
- checkpoint);
+ checkpoint,
+ recoveryTimeout);
spawn(process);
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 004d90e..7bf127a 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -68,7 +68,8 @@ ExecutorLauncher::ExecutorLauncher(
const string& _hadoopHome,
bool _redirectIO,
bool _shouldSwitchUser,
- bool _checkpoint)
+ bool _checkpoint,
+ Duration _recoveryTimeout)
: slaveId(_slaveId),
frameworkId(_frameworkId),
executorId(_executorId),
@@ -82,7 +83,8 @@ ExecutorLauncher::ExecutorLauncher(
hadoopHome(_hadoopHome),
redirectIO(_redirectIO),
shouldSwitchUser(_shouldSwitchUser),
- checkpoint (_checkpoint) {}
+ checkpoint(_checkpoint),
+ recoveryTimeout(_recoveryTimeout) {}
ExecutorLauncher::~ExecutorLauncher() {}
@@ -456,6 +458,10 @@ map<string, string> ExecutorLauncher::getEnvironment()
env["MESOS_EXECUTOR_UUID"] = uuid.toString();
env["MESOS_CHECKPOINT"] = checkpoint ? "1" : "0";
+ if (checkpoint) {
+ env["MESOS_RECOVERY_TIMEOUT"] = stringify(recoveryTimeout);
+ }
+
return env;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.hpp b/src/launcher/launcher.hpp
index 637c9bc..104fe81 100644
--- a/src/launcher/launcher.hpp
+++ b/src/launcher/launcher.hpp
@@ -24,6 +24,7 @@
#include <mesos/mesos.hpp>
+#include <stout/duration.hpp>
#include <stout/uuid.hpp>
#include "slave/flags.hpp"
@@ -60,7 +61,8 @@ public:
const std::string& hadoopHome,
bool redirectIO,
bool shouldSwitchUser,
- bool checkpoint);
+ bool checkpoint,
+ Duration recoveryTimeout);
virtual ~ExecutorLauncher();
@@ -111,6 +113,9 @@ protected:
const bool redirectIO; // Whether to redirect stdout and stderr to files.
const bool shouldSwitchUser; // Whether to setuid to framework's user.
const bool checkpoint; // Whether the framework enabled checkpointing.
+
+ // Executor suicide timeout for slave recovery.
+ const Duration recoveryTimeout;
};
} // namespace launcher {
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/main.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/main.cpp b/src/launcher/main.cpp
index 5674afb..de64609 100644
--- a/src/launcher/main.cpp
+++ b/src/launcher/main.cpp
@@ -16,8 +16,11 @@
* limitations under the License.
*/
+#include <string>
+
#include <mesos/mesos.hpp>
+#include <stout/duration.hpp>
#include <stout/strings.hpp>
#include <stout/os.hpp>
@@ -26,6 +29,8 @@
using namespace mesos;
using namespace mesos::internal; // For 'utils'.
+using std::string;
+
int main(int argc, char** argv)
{
@@ -57,6 +62,24 @@ int main(int argc, char** argv)
commandInfo.add_uris()->MergeFrom(uri);
}
+ bool checkpoint = os::getenv("MESOS_CHECKPOINT", false) == "1";
+
+ Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+
+ // Get the recovery timeout if checkpointing is enabled.
+ if (checkpoint) {
+ string value = os::getenv("MESOS_RECOVERY_TIMEOUT", false);
+
+ if (!value.empty()) {
+ Try<Duration> _recoveryTimeout = Duration::parse(value);
+
+ CHECK_SOME(_recoveryTimeout)
+ << "Cannot parse MESOS_RECOVERY_TIMEOUT '" + value + "'";
+
+ recoveryTimeout = _recoveryTimeout.get();
+ }
+ }
+
return mesos::internal::launcher::ExecutorLauncher(
slaveId,
frameworkId,
@@ -71,6 +94,7 @@ int main(int argc, char** argv)
os::getenv("MESOS_HADOOP_HOME"),
os::getenv("MESOS_REDIRECT_IO") == "1",
os::getenv("MESOS_SWITCH_USER") == "1",
- os::getenv("MESOS_CHECKPOINT") == "1")
+ checkpoint,
+ recoveryTimeout)
.run();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index d4ccd11..676768e 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -623,7 +623,8 @@ void CgroupsIsolator::launchExecutor(
flags.hadoop_home,
!local,
flags.switch_user,
- frameworkInfo.checkpoint());
+ frameworkInfo.checkpoint(),
+ flags.recovery_timeout);
// First fetch the executor.
if (launcher.setup() < 0) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index e8d16ca..8c74c00 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -29,6 +29,7 @@ const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10);
const Duration GC_DELAY = Weeks(1);
const double GC_DISK_HEADROOM = 0.1;
const Duration DISK_WATCH_INTERVAL = Minutes(1);
+const Duration RECOVERY_TIMEOUT = Minutes(15);
const Duration RESOURCE_MONITORING_INTERVAL = Seconds(5);
const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
const uint32_t MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 901fdf2..bbbbfd3 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -37,6 +37,7 @@ namespace slave {
extern const Duration EXECUTOR_REGISTRATION_TIMEOUT;
extern const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD;
extern const Duration EXECUTOR_REREGISTER_TIMEOUT;
+extern const Duration RECOVERY_TIMEOUT;
extern const Duration STATUS_UPDATE_RETRY_INTERVAL;
extern const Duration GC_DELAY;
extern const Duration DISK_WATCH_INTERVAL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 616be9b..ea1e4f7 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -141,6 +141,14 @@ public:
" and the slave registers with the master as a new slave.",
"reconnect");
+ add(&Flags::recovery_timeout,
+ "recovery_timeout",
+ "Amount of time alloted for the slave to recover. If the slave takes\n"
+ "longer than recovery_timeout to recover, any executors that are\n"
+ "waiting to reconnect to the slave will self-terminate.\n"
+ "NOTE: This flag is only applicable when checkpoint is enabled.\n",
+ RECOVERY_TIMEOUT);
+
add(&Flags::strict,
"strict",
"If strict=true, any and all recovery errors are considered fatal.\n"
@@ -189,6 +197,7 @@ public:
Duration resource_monitoring_interval;
bool checkpoint;
std::string recover;
+ Duration recovery_timeout;
bool strict;
#ifdef __linux__
std::string cgroups_hierarchy;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 24a7fb2..fa80293 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -136,7 +136,8 @@ void ProcessIsolator::launchExecutor(
flags.hadoop_home,
!local,
flags.switch_user,
- frameworkInfo.checkpoint());
+ frameworkInfo.checkpoint(),
+ flags.recovery_timeout);
// We get the environment map for launching mesos-launcher before
// the fork, because we have seen deadlock issues with ostringstream
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0735dba..57636c1 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -690,6 +690,99 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
}
+// The slave is stopped after a non-terminal update is received.
+// The command executor is expected to self-terminate while the slave
+// is down, because the recovery timeout elapses.
+// When the slave comes back up with recovery=reconnect, make
+// sure the task is properly transitioned to FAILED.
+TYPED_TEST(SlaveRecoveryTest, RecoveryTimeout)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ TypeParam isolator1;
+
+ // Set a short recovery timeout, as we can't control the executor
+ // driver time when using the process / cgroups isolators.
+ slave::Flags flags = this->CreateSlaveFlags();
+ flags.recovery_timeout = Milliseconds(1);
+
+ Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+
+ EXPECT_CALL(sched, statusUpdate(_, _));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+
+ this->Stop(slave.get());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Ensure the executor terminates by causing the recovery timeout
+ // to elapse while disconnected from the slave.
+ os::sleep(Milliseconds(1));
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+ // Restart the slave (use same flags) with a new isolator.
+ TypeParam isolator2;
+
+ slave = this->StartSlave(&isolator2, flags);
+ ASSERT_SOME(slave);
+
+ Clock::pause();
+
+ AWAIT_READY(_recover);
+
+ Clock::advance(EXECUTOR_REREGISTER_TIMEOUT);
+ Clock::settle();
+
+ Clock::resume();
+
+ // Scheduler should receive the TASK_FAILED update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_FAILED, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}
+
+
// The slave is stopped after an executor is completed (i.e., it has
// terminated and all its updates have been acknowledged).
// When it comes back up with recovery=reconnect, make