You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:31 UTC
[2/6] git commit: Updated status update manager to forward updates
via slave.
Updated status update manager to forward updates via slave.
Review: https://reviews.apache.org/r/26846
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e64dda41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e64dda41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e64dda41
Branch: refs/heads/master
Commit: e64dda411bc83963179c92ae71caefa8d21b54b4
Parents: 616d401
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Oct 15 18:22:46 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:08 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 6 +-
src/slave/slave.cpp | 122 ++++++++++++++++++-------------
src/slave/slave.hpp | 4 +
src/slave/status_update_manager.cpp | 47 ++----------
src/slave/status_update_manager.hpp | 2 -
src/tests/slave_recovery_tests.cpp | 19 +++--
6 files changed, 94 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index be910d9..f04c085 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3294,14 +3294,12 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
}
-// NOTE: We cannot use 'from' here to identify the slave as this is
-// now sent by the StatusUpdateManagerProcess and master itself when
-// it generates TASK_LOST messages. Only 'pid' can be used to identify
-// the slave.
// TODO(bmahler): The master will not release resources until the
// slave receives acknowlegements for all non-terminal updates. This
// means if a framework is down, the resources will remain allocated
// even though the tasks are terminal on the slaves!
+// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' because
+// the status updates will be sent by the slave.
void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
{
++metrics.messages_status_update;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7b5474a..afcb669 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -497,7 +497,7 @@ void Slave::finalize()
}
}
- if (state == TERMINATING || flags.recover == "cleanup") {
+ if (state == TERMINATING) {
// We remove the "latest" symlink in meta directory, so that the
// slave doesn't recover the state when it restarts and registers
// as a new slave with the master.
@@ -533,14 +533,8 @@ void Slave::shutdown(const UPID& from, const string& message)
if (frameworks.empty()) { // Terminate slave if there are no frameworks.
terminate(self());
} else {
- // NOTE: The slave will terminate after all
- // executors have terminated.
- // TODO(vinod): Wait until all updates have been acknowledged.
- // This is tricky without persistent state at master because the
- // slave might wait forever for status update acknowledgements,
- // since it cannot reliably know when a framework has shut down.
- // A short-term fix could be to wait for a certain time for ACKs
- // and then shutdown.
+ // NOTE: The slave will terminate after all the executors have
+ // terminated.
// NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
// can potentially remove a framework from 'frameworks'.
foreach (const FrameworkID& frameworkId, frameworks.keys()) {
@@ -600,22 +594,11 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master)
LOG(INFO) << "New master detected at " << master.get();
link(master.get());
- // Inform the status updates manager about the new master.
- statusUpdateManager->newMasterDetected(master.get());
-
if (state == TERMINATING) {
LOG(INFO) << "Skipping registration because slave is terminating";
return;
}
- // The slave does not (re-)register if it is in the cleanup mode
- // because we do not want to accept new tasks.
- if (flags.recover == "cleanup") {
- LOG(INFO)
- << "Skipping registration because slave was started in cleanup mode";
- return;
- }
-
// Wait for a random amount of time before authentication or
// registration.
Duration duration =
@@ -829,6 +812,11 @@ void Slave::reregistered(
CHECK_SOME(master);
LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
+
+ // Inform status update manager to immediately resend any
+ // pending updates.
+ statusUpdateManager->flush();
+
break;
case RUNNING:
CHECK_SOME(master);
@@ -931,6 +919,8 @@ void Slave::doReliableRegistration(const Duration& duration)
CHECK(state == DISCONNECTED) << state;
+ CHECK_NE("cleanup", flags.recover);
+
if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
@@ -2373,6 +2363,33 @@ void Slave::__statusUpdate(
}
+// NOTE: An acknowledgement for this update might have already been
+// processed by the slave but not the status update manager.
+void Slave::forward(const StatusUpdate& update)
+{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Dropping status update " << update
+ << " sent by status update manager because the slave"
+ << " is in " << state << " state";
+ return;
+ }
+
+ CHECK_SOME(master);
+ LOG(INFO) << "Forwarding the update " << update << " to " << master.get();
+
+ // Forward the update to master.
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(self()); // The ACK will be first received by the slave.
+
+ send(master.get(), message);
+}
+
+
void Slave::executorMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
@@ -2866,9 +2883,10 @@ void Slave::executorTerminated(
if (master.isSome()) { send(master.get(), message); }
}
- // Remove the executor if either the framework is terminating or
- // there are no incomplete tasks.
- if (framework->state == Framework::TERMINATING ||
+ // Remove the executor if either the slave or framework is
+ // terminating or there are no incomplete tasks.
+ if (state == TERMINATING ||
+ framework->state == Framework::TERMINATING ||
!executor->incompleteTasks()) {
removeExecutor(framework, executor);
}
@@ -2900,13 +2918,15 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
framework->state == Framework::TERMINATING)
<< framework->state;
- // Check that this executor has terminated and either has no
- // pending updates or the framework is terminating. We don't
- // care for pending updates when a framework is terminating
- // because the framework cannot ACK them.
+ // Check that this executor has terminated.
CHECK(executor->state == Executor::TERMINATED) << executor->state;
- CHECK(framework->state == Framework::TERMINATING ||
- !executor->incompleteTasks());
+
+ // Check that either 1) the executor has no tasks with pending
+ // updates or 2) the slave/framework is terminating, because no
+ // acknowledgements might be received.
+ CHECK(!executor->incompleteTasks() ||
+ state == TERMINATING ||
+ framework->state == Framework::TERMINATING);
// Write a sentinel file to indicate that this executor
// is completed.
@@ -3006,17 +3026,8 @@ void Slave::removeFramework(Framework* framework)
// Pass ownership of the framework pointer.
completedFrameworks.push_back(Owned<Framework>(framework));
- if (frameworks.empty()) {
- // Terminate the slave if
- // 1) it's being shut down or
- // 2) it's started in cleanup mode and recovery finished.
- // TODO(vinod): Instead of doing it this way, shutdownFramework()
- // and shutdownExecutor() could return Futures and a slave could
- // shutdown when all the Futures are satisfied (e.g., collect()).
- if (state == TERMINATING ||
- (flags.recover == "cleanup" && !recovered.future().isPending())) {
- terminate(self());
- }
+ if (state == TERMINATING && frameworks.empty()) {
+ terminate(self());
}
}
@@ -3362,7 +3373,6 @@ void Slave::__recover(const Future<Nothing>& future)
LOG(INFO) << "Finished recovery";
CHECK_EQ(RECOVERING, state);
- state = DISCONNECTED;
// Checkpoint boot ID.
Try<string> bootId = os::bootId();
@@ -3413,18 +3423,30 @@ void Slave::__recover(const Future<Nothing>& future)
}
}
- recovered.set(Nothing()); // Signal recovery.
+ if (flags.recover == "reconnect") {
+ state = DISCONNECTED;
- // Terminate slave, if it has no active frameworks and is started
- // in 'cleanup' mode.
- if (frameworks.empty() && flags.recover == "cleanup") {
- terminate(self());
- return;
+ // Start detecting masters.
+ detection = detector->detect()
+ .onAny(defer(self(), &Slave::detected, lambda::_1));
+ } else {
+ // Slave started in cleanup mode.
+ CHECK_EQ("cleanup", flags.recover);
+ state = TERMINATING;
+
+ if (frameworks.empty()) {
+ terminate(self());
+ }
+
+ // If there are active executors/frameworks, the slave will
+ // shutdown when all the executors are terminated. Note that
+ // the executors are guaranteed to terminate because they
+ // are sent shutdown signal in '_recover()' which results in
+ // 'Containerizer::destroy()' being called if the termination
+ // doesn't happen within a timeout.
}
- // Start detecting masters.
- detection = detector->detect()
- .onAny(defer(self(), &Slave::detected, lambda::_1));
+ recovered.set(Nothing()); // Signal recovery.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ccc0e03..439052e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -197,6 +197,10 @@ public:
const StatusUpdate& update,
const process::UPID& pid);
+ // This is called by status update manager to forward a
+ // status update to the master.
+ void forward(const StatusUpdate& update);
+
void statusUpdateAcknowledgement(
const process::UPID& from,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 5d5cf23..fb35ace 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -91,8 +91,6 @@ public:
const string& rootDir,
const Option<SlaveState>& state);
- void newMasterDetected(const UPID& pid);
-
void flush();
void cleanup(const FrameworkID& frameworkId);
@@ -135,7 +133,6 @@ private:
const TaskID& taskId,
const FrameworkID& frameworkId);
- UPID master;
Flags flags;
PID<Slave> slave;
hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
@@ -162,23 +159,13 @@ void StatusUpdateManagerProcess::initialize(
}
-void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
-{
- LOG(INFO) << "New master detected at " << pid;
- master = pid;
-
- // Retry any pending status updates.
- flush();
-}
-
-
void StatusUpdateManagerProcess::flush()
{
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
if (!stream->pending.empty()) {
const StatusUpdate& update = stream->pending.front();
- LOG(WARNING) << "Resending status update " << update;
+ LOG(WARNING) << "Flushing status update " << update;
stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
@@ -256,18 +243,11 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
}
// At the end of the replay, the stream is either terminated or
- // contains only unacknowledged, if any, pending updates.
+ // contains only unacknowledged, if any, pending updates. The
+ // pending updates will be flushed after the slave
+ // re-registers with the master.
if (stream->terminated) {
cleanupStatusUpdateStream(task.id, framework.id);
- } else {
- // If a stream has pending updates after the replay,
- // send the first pending update.
- const Result<StatusUpdate>& next = stream->next();
- CHECK(!next.isError());
- if (next.isSome()) {
- stream->timeout =
- forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
- }
}
}
}
@@ -369,18 +349,10 @@ Timeout StatusUpdateManagerProcess::forward(
const StatusUpdate& update,
const Duration& duration)
{
- if (master) {
- LOG(INFO) << "Forwarding status update " << update << " to " << master;
+ VLOG(1) << "Forwarding update " << update << " to the slave";
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(slave); // The ACK will be first received by the slave.
-
- send(master, message);
- } else {
- LOG(WARNING) << "Not forwarding status update " << update
- << " because no master is elected yet";
- }
+ // Forward the update to the slave.
+ dispatch(slave, &Slave::forward, update);
// Send a message to self to resend after some delay if no ACK is received.
return delay(duration,
@@ -622,11 +594,6 @@ Future<Nothing> StatusUpdateManager::recover(
}
-void StatusUpdateManager::newMasterDetected(const UPID& pid)
-{
- dispatch(process, &StatusUpdateManagerProcess::newMasterDetected, pid);
-}
-
void StatusUpdateManager::flush()
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index c371e55..1c1a8a8 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -117,8 +117,6 @@ public:
const std::string& rootDir,
const Option<state::SlaveState>& state);
- // TODO(vinod): Remove this hack once the new leader detector code is merged.
- void newMasterDetected(const process::UPID& pid);
// Resend all the pending updates right away.
// This is useful when the updates were pending because there was
http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 4fb357b..813e2d6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -912,7 +912,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
// The slave is stopped after a non-terminal update is received.
// Slave is restarted in recovery=cleanup mode. It kills the command
-// executor, and transitions the task to FAILED.
+// executor, and terminates. Master should then send TASK_LOST.
TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
{
Try<PID<Master> > master = this->StartMaster();
@@ -965,8 +965,8 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
this->Stop(slave.get());
delete containerizer1.get();
- // Slave in cleanup mode shouldn't reregister with slave and hence
- // no offers should be made by the master.
+ // Slave in cleanup mode shouldn't re-register with the master and
+ // hence no offers should be made by the master.
EXPECT_CALL(sched, resourceOffers(_, _))
.Times(0);
@@ -976,12 +976,14 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+ EXPECT_CALL(sched, slaveLost(_, _))
+ .Times(AtMost(1));
+
// Restart the slave in 'cleanup' recovery mode with a new isolator.
Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
ASSERT_SOME(containerizer2);
flags.recover = "cleanup";
-
slave = this->StartSlave(containerizer2.get(), flags);
ASSERT_SOME(slave);
@@ -993,15 +995,12 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
Clock::settle();
}
- // Scheduler should receive the TASK_FAILED update.
- AWAIT_READY(status);
- ASSERT_EQ(TASK_FAILED, status.get().state());
-
// Wait for recovery to complete.
AWAIT_READY(__recover);
- Clock::settle();
- Clock::resume();
+ // Scheduler should receive the TASK_LOST update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_LOST, status.get().state());
driver.stop();
driver.join();