You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:32 UTC
[3/6] git commit: Added pause() and resume() to status update manager.
Added pause() and resume() to status update manager.
Review: https://reviews.apache.org/r/26957
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65c3c363
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65c3c363
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65c3c363
Branch: refs/heads/master
Commit: 65c3c3639b385d880dbfe10bc4f652655695c8b3
Parents: e64dda4
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 17 15:26:52 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:08 2014 -0700
----------------------------------------------------------------------
src/local/local.cpp | 2 +-
src/slave/main.cpp | 2 +-
src/slave/slave.cpp | 15 ++++---
src/slave/status_update_manager.cpp | 75 ++++++++++++++++++++++----------
src/slave/status_update_manager.hpp | 18 +++++---
src/tests/cluster.hpp | 2 +-
src/tests/mesos.cpp | 8 +++-
src/tests/mesos.hpp | 4 +-
8 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 66de798..2756d42 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -214,7 +214,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
}
garbageCollectors->push_back(new GarbageCollector());
- statusUpdateManagers->push_back(new StatusUpdateManager());
+ statusUpdateManagers->push_back(new StatusUpdateManager(flags));
Try<Containerizer*> containerizer = Containerizer::create(flags, true);
if (containerizer.isError()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index b27cc32..bf56f69 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -165,7 +165,7 @@ int main(int argc, char** argv)
Files files;
GarbageCollector gc;
- StatusUpdateManager statusUpdateManager;
+ StatusUpdateManager statusUpdateManager(flags);
Slave* slave = new Slave(
flags,
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index afcb669..a98e408 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -318,7 +318,7 @@ void Slave::initialize()
LOG(INFO) << "Slave hostname: " << info.hostname();
LOG(INFO) << "Slave checkpoint: " << stringify(flags.checkpoint);
- statusUpdateManager->initialize(flags, self());
+ statusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1));
// Start disk monitoring.
// NOTE: We send a delayed message here instead of directly calling
@@ -573,6 +573,9 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master)
state = DISCONNECTED;
}
+ // Pause the status updates.
+ statusUpdateManager->pause();
+
if (_master.isFailed()) {
EXIT(1) << "Failed to detect a master: " << _master.failure();
}
@@ -749,6 +752,9 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
<< "; given slave ID " << slaveId;
state = RUNNING;
+
+ statusUpdateManager->resume(); // Resume status updates.
+
info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
if (flags.checkpoint) {
@@ -813,10 +819,7 @@ void Slave::reregistered(
LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
- // Inform status update manager to immediately resend any
- // pending updates.
- statusUpdateManager->flush();
-
+ statusUpdateManager->resume(); // Resume status updates.
break;
case RUNNING:
CHECK_SOME(master);
@@ -1699,7 +1702,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
// Inform status update manager to immediately resend any pending
// updates.
- statusUpdateManager->flush();
+ statusUpdateManager->resume();
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fb35ace..9bdbf5e 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -24,6 +24,7 @@
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include <stout/utils.hpp>
@@ -37,6 +38,8 @@
#include "slave/state.hpp"
#include "slave/status_update_manager.hpp"
+using lambda::function;
+
using std::string;
using process::wait; // Necessary on some OS's to disambiguate.
@@ -61,16 +64,14 @@ class StatusUpdateManagerProcess
: public ProtobufProcess<StatusUpdateManagerProcess>
{
public:
- StatusUpdateManagerProcess() {}
+ StatusUpdateManagerProcess(const Flags& flags);
virtual ~StatusUpdateManagerProcess();
// Explicitely use 'initialize' since we're overloading below.
using process::ProcessBase::initialize;
// StatusUpdateManager implementation.
- void initialize(
- const Flags& flags,
- const PID<Slave>& slave);
+ void initialize(const function<void(const StatusUpdate&)>& forward);
Future<Nothing> update(
const StatusUpdate& update,
@@ -91,7 +92,8 @@ public:
const string& rootDir,
const Option<SlaveState>& state);
- void flush();
+ void pause();
+ void resume();
void cleanup(const FrameworkID& frameworkId);
@@ -133,12 +135,19 @@ private:
const TaskID& taskId,
const FrameworkID& frameworkId);
- Flags flags;
- PID<Slave> slave;
+ const Flags flags;
+ bool paused;
+
+ function<void(const StatusUpdate&)> forward_;
+
hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
};
+StatusUpdateManagerProcess::StatusUpdateManagerProcess(const Flags& _flags)
+ : flags(_flags), paused(false) {}
+
+
StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
{
foreachkey (const FrameworkID& frameworkId, streams) {
@@ -151,21 +160,29 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
void StatusUpdateManagerProcess::initialize(
- const Flags& _flags,
- const PID<Slave>& _slave)
+ const function<void(const StatusUpdate&)>& forward)
+{
+ forward_ = forward;
+}
+
+
+void StatusUpdateManagerProcess::pause()
{
- flags = _flags;
- slave = _slave;
+ LOG(INFO) << "Pausing sending status updates";
+ paused = true;
}
-void StatusUpdateManagerProcess::flush()
+void StatusUpdateManagerProcess::resume()
{
+ LOG(INFO) << "Resuming sending status updates";
+ paused = false;
+
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
if (!stream->pending.empty()) {
const StatusUpdate& update = stream->pending.front();
- LOG(WARNING) << "Flushing status update " << update;
+ LOG(WARNING) << "Resending status update " << update;
stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
@@ -330,7 +347,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
// Forward the status update to the master if this is the first in the stream.
// Subsequent status updates will get sent in 'acknowledgement()'.
- if (stream->pending.size() == 1) {
+ if (!paused && stream->pending.size() == 1) {
CHECK(stream->timeout.isNone());
const Result<StatusUpdate>& next = stream->next();
if (next.isError()) {
@@ -349,10 +366,12 @@ Timeout StatusUpdateManagerProcess::forward(
const StatusUpdate& update,
const Duration& duration)
{
+ CHECK(!paused);
+
VLOG(1) << "Forwarding update " << update << " to the slave";
- // Forward the update to the slave.
- dispatch(slave, &Slave::forward, update);
+ // Forward the update.
+ forward_(update);
// Send a message to self to resend after some delay if no ACK is received.
return delay(duration,
@@ -426,7 +445,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
<< " but updates are still pending";
}
cleanupStatusUpdateStream(taskId, frameworkId);
- } else if (next.isSome()) {
+ } else if (!paused && next.isSome()) {
// Forward the next queued status update.
stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
@@ -438,6 +457,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
// TODO(vinod): There should be a limit on the retries.
void StatusUpdateManagerProcess::timeout(const Duration& duration)
{
+ if (paused) {
+ return;
+ }
+
// Check and see if we should resend any status updates.
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
@@ -520,9 +543,9 @@ void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
}
-StatusUpdateManager::StatusUpdateManager()
+StatusUpdateManager::StatusUpdateManager(const Flags& flags)
{
- process = new StatusUpdateManagerProcess();
+ process = new StatusUpdateManagerProcess(flags);
spawn(process);
}
@@ -536,10 +559,9 @@ StatusUpdateManager::~StatusUpdateManager()
void StatusUpdateManager::initialize(
- const Flags& flags,
- const PID<Slave>& slave)
+ const function<void(const StatusUpdate&)>& forward)
{
- dispatch(process, &StatusUpdateManagerProcess::initialize, flags, slave);
+ dispatch(process, &StatusUpdateManagerProcess::initialize, forward);
}
@@ -594,10 +616,15 @@ Future<Nothing> StatusUpdateManager::recover(
}
+void StatusUpdateManager::pause()
+{
+ dispatch(process, &StatusUpdateManagerProcess::pause);
+}
+
-void StatusUpdateManager::flush()
+void StatusUpdateManager::resume()
{
- dispatch(process, &StatusUpdateManagerProcess::flush);
+ dispatch(process, &StatusUpdateManagerProcess::resume);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 1c1a8a8..2852884 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -31,6 +31,7 @@
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
@@ -72,12 +73,12 @@ struct StatusUpdateStream;
class StatusUpdateManager
{
public:
- StatusUpdateManager();
+ StatusUpdateManager(const Flags& flags);
virtual ~StatusUpdateManager();
- void initialize(
- const Flags& flags,
- const process::PID<Slave>& slave);
+ // Expects a callback 'forward' which gets called whenever there is
+ // a new status update that needs to be forwarded to the master.
+ void initialize(const lambda::function<void(const StatusUpdate&)>& forward);
// TODO(vinod): Come up with better names/signatures for the
// checkpointing and non-checkpointing 'update()' functions.
@@ -118,10 +119,15 @@ public:
const Option<state::SlaveState>& state);
- // Resend all the pending updates right away.
+ // Pause sending updates.
+ // This is useful when the slave is disconnected because a
+ // disconnected slave will drop the updates.
+ void pause();
+
+ // Unpause and resend all the pending updates right away.
// This is useful when the updates were pending because there was
// no master elected (e.g., during recovery) or framework failed over.
- void flush();
+ void resume();
// Closes all the status update streams corresponding to this framework.
// NOTE: This stops retrying any pending status updates for this framework.
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index ee194ad..fa5eeef 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -488,7 +488,7 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
// Create a status update manager if one wasn't provided.
if (statusUpdateManager.isNone()) {
- slave.statusUpdateManager.reset(new slave::StatusUpdateManager());
+ slave.statusUpdateManager.reset(new slave::StatusUpdateManager(flags));
}
slave.flags = flags;
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 147e23f..bff10d2 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -349,7 +349,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
containerizer,
&files,
&gc,
- &statusUpdateManager)
+ statusUpdateManager = new slave::StatusUpdateManager(flags))
{
// Set up default behaviors, calling the original methods.
EXPECT_CALL(*this, runTask(_, _, _, _, _)).
@@ -363,6 +363,12 @@ MockSlave::MockSlave(const slave::Flags& flags,
}
+MockSlave::~MockSlave()
+{
+ delete statusUpdateManager;
+}
+
+
void MockSlave::unmocked_runTask(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index e40575c..e36e138 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -548,7 +548,7 @@ public:
MasterDetector* detector,
slave::Containerizer* containerizer);
- virtual ~MockSlave() {}
+ virtual ~MockSlave();
MOCK_METHOD5(runTask, void(
const process::UPID& from,
@@ -597,7 +597,7 @@ public:
private:
Files files;
MockGarbageCollector gc;
- slave::StatusUpdateManager statusUpdateManager;
+ slave::StatusUpdateManager* statusUpdateManager;
};