You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:32 UTC

[3/6] git commit: Added pause() and resume() to status update manager.

Added pause() and resume() to status update manager.

Review: https://reviews.apache.org/r/26957


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65c3c363
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65c3c363
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65c3c363

Branch: refs/heads/master
Commit: 65c3c3639b385d880dbfe10bc4f652655695c8b3
Parents: e64dda4
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 17 15:26:52 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:08 2014 -0700

----------------------------------------------------------------------
 src/local/local.cpp                 |  2 +-
 src/slave/main.cpp                  |  2 +-
 src/slave/slave.cpp                 | 15 ++++---
 src/slave/status_update_manager.cpp | 75 ++++++++++++++++++++++----------
 src/slave/status_update_manager.hpp | 18 +++++---
 src/tests/cluster.hpp               |  2 +-
 src/tests/mesos.cpp                 |  8 +++-
 src/tests/mesos.hpp                 |  4 +-
 8 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 66de798..2756d42 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -214,7 +214,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     }
 
     garbageCollectors->push_back(new GarbageCollector());
-    statusUpdateManagers->push_back(new StatusUpdateManager());
+    statusUpdateManagers->push_back(new StatusUpdateManager(flags));
 
     Try<Containerizer*> containerizer = Containerizer::create(flags, true);
     if (containerizer.isError()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index b27cc32..bf56f69 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -165,7 +165,7 @@ int main(int argc, char** argv)
 
   Files files;
   GarbageCollector gc;
-  StatusUpdateManager statusUpdateManager;
+  StatusUpdateManager statusUpdateManager(flags);
 
   Slave* slave = new Slave(
       flags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index afcb669..a98e408 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -318,7 +318,7 @@ void Slave::initialize()
   LOG(INFO) << "Slave hostname: " << info.hostname();
   LOG(INFO) << "Slave checkpoint: " << stringify(flags.checkpoint);
 
-  statusUpdateManager->initialize(flags, self());
+  statusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1));
 
   // Start disk monitoring.
   // NOTE: We send a delayed message here instead of directly calling
@@ -573,6 +573,9 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master)
     state = DISCONNECTED;
   }
 
+  // Pause the status updates.
+  statusUpdateManager->pause();
+
   if (_master.isFailed()) {
     EXIT(1) << "Failed to detect a master: " << _master.failure();
   }
@@ -749,6 +752,9 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
                 << "; given slave ID " << slaveId;
 
       state = RUNNING;
+
+      statusUpdateManager->resume(); // Resume status updates.
+
       info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
 
       if (flags.checkpoint) {
@@ -813,10 +819,7 @@ void Slave::reregistered(
       LOG(INFO) << "Re-registered with master " << master.get();
       state = RUNNING;
 
-      // Inform status update manager to immediately resend any
-      // pending updates.
-      statusUpdateManager->flush();
-
+      statusUpdateManager->resume(); // Resume status updates.
       break;
     case RUNNING:
       CHECK_SOME(master);
@@ -1699,7 +1702,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
 
       // Inform status update manager to immediately resend any pending
       // updates.
-      statusUpdateManager->flush();
+      statusUpdateManager->resume();
 
       break;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fb35ace..9bdbf5e 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -24,6 +24,7 @@
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/utils.hpp>
@@ -37,6 +38,8 @@
 #include "slave/state.hpp"
 #include "slave/status_update_manager.hpp"
 
+using lambda::function;
+
 using std::string;
 
 using process::wait; // Necessary on some OS's to disambiguate.
@@ -61,16 +64,14 @@ class StatusUpdateManagerProcess
   : public ProtobufProcess<StatusUpdateManagerProcess>
 {
 public:
-  StatusUpdateManagerProcess() {}
+  StatusUpdateManagerProcess(const Flags& flags);
   virtual ~StatusUpdateManagerProcess();
 
   // Explicitely use 'initialize' since we're overloading below.
   using process::ProcessBase::initialize;
 
   // StatusUpdateManager implementation.
-  void initialize(
-      const Flags& flags,
-      const PID<Slave>& slave);
+  void initialize(const function<void(const StatusUpdate&)>& forward);
 
   Future<Nothing> update(
       const StatusUpdate& update,
@@ -91,7 +92,8 @@ public:
       const string& rootDir,
       const Option<SlaveState>& state);
 
-  void flush();
+  void pause();
+  void resume();
 
   void cleanup(const FrameworkID& frameworkId);
 
@@ -133,12 +135,19 @@ private:
       const TaskID& taskId,
       const FrameworkID& frameworkId);
 
-  Flags flags;
-  PID<Slave> slave;
+  const Flags flags;
+  bool paused;
+
+  function<void(const StatusUpdate&)> forward_;
+
   hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
 };
 
 
+StatusUpdateManagerProcess::StatusUpdateManagerProcess(const Flags& _flags)
+  : flags(_flags), paused(false) {}
+
+
 StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
 {
   foreachkey (const FrameworkID& frameworkId, streams) {
@@ -151,21 +160,29 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
 
 
 void StatusUpdateManagerProcess::initialize(
-    const Flags& _flags,
-    const PID<Slave>& _slave)
+    const function<void(const StatusUpdate&)>& forward)
+{
+  forward_ = forward;
+}
+
+
+void StatusUpdateManagerProcess::pause()
 {
-  flags = _flags;
-  slave = _slave;
+  LOG(INFO) << "Pausing sending status updates";
+  paused = true;
 }
 
 
-void StatusUpdateManagerProcess::flush()
+void StatusUpdateManagerProcess::resume()
 {
+  LOG(INFO) << "Resuming sending status updates";
+  paused = false;
+
   foreachkey (const FrameworkID& frameworkId, streams) {
     foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
       if (!stream->pending.empty()) {
         const StatusUpdate& update = stream->pending.front();
-        LOG(WARNING) << "Flushing status update " << update;
+        LOG(WARNING) << "Resending status update " << update;
         stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
       }
     }
@@ -330,7 +347,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
 
   // Forward the status update to the master if this is the first in the stream.
   // Subsequent status updates will get sent in 'acknowledgement()'.
-  if (stream->pending.size() == 1) {
+  if (!paused && stream->pending.size() == 1) {
     CHECK(stream->timeout.isNone());
     const Result<StatusUpdate>& next = stream->next();
     if (next.isError()) {
@@ -349,10 +366,12 @@ Timeout StatusUpdateManagerProcess::forward(
     const StatusUpdate& update,
     const Duration& duration)
 {
+  CHECK(!paused);
+
   VLOG(1) << "Forwarding update " << update << " to the slave";
 
-  // Forward the update to the slave.
-  dispatch(slave, &Slave::forward, update);
+  // Forward the update.
+  forward_(update);
 
   // Send a message to self to resend after some delay if no ACK is received.
   return delay(duration,
@@ -426,7 +445,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
                    << " but updates are still pending";
     }
     cleanupStatusUpdateStream(taskId, frameworkId);
-  } else if (next.isSome()) {
+  } else if (!paused && next.isSome()) {
     // Forward the next queued status update.
     stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
   }
@@ -438,6 +457,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
 // TODO(vinod): There should be a limit on the retries.
 void StatusUpdateManagerProcess::timeout(const Duration& duration)
 {
+  if (paused) {
+    return;
+  }
+
   // Check and see if we should resend any status updates.
   foreachkey (const FrameworkID& frameworkId, streams) {
     foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
@@ -520,9 +543,9 @@ void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
 }
 
 
-StatusUpdateManager::StatusUpdateManager()
+StatusUpdateManager::StatusUpdateManager(const Flags& flags)
 {
-  process = new StatusUpdateManagerProcess();
+  process = new StatusUpdateManagerProcess(flags);
   spawn(process);
 }
 
@@ -536,10 +559,9 @@ StatusUpdateManager::~StatusUpdateManager()
 
 
 void StatusUpdateManager::initialize(
-    const Flags& flags,
-    const PID<Slave>& slave)
+    const function<void(const StatusUpdate&)>& forward)
 {
-  dispatch(process, &StatusUpdateManagerProcess::initialize, flags, slave);
+  dispatch(process, &StatusUpdateManagerProcess::initialize, forward);
 }
 
 
@@ -594,10 +616,15 @@ Future<Nothing> StatusUpdateManager::recover(
 }
 
 
+void StatusUpdateManager::pause()
+{
+  dispatch(process, &StatusUpdateManagerProcess::pause);
+}
+
 
-void StatusUpdateManager::flush()
+void StatusUpdateManager::resume()
 {
-  dispatch(process, &StatusUpdateManagerProcess::flush);
+  dispatch(process, &StatusUpdateManagerProcess::resume);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 1c1a8a8..2852884 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -31,6 +31,7 @@
 
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
 #include <stout/none.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
@@ -72,12 +73,12 @@ struct StatusUpdateStream;
 class StatusUpdateManager
 {
 public:
-  StatusUpdateManager();
+  StatusUpdateManager(const Flags& flags);
   virtual ~StatusUpdateManager();
 
-  void initialize(
-      const Flags& flags,
-      const process::PID<Slave>& slave);
+  // Expects a callback 'forward' which gets called whenever there is
+  // a new status update that needs to be forwarded to the master.
+  void initialize(const lambda::function<void(const StatusUpdate&)>& forward);
 
   // TODO(vinod): Come up with better names/signatures for the
   // checkpointing and non-checkpointing 'update()' functions.
@@ -118,10 +119,15 @@ public:
       const Option<state::SlaveState>& state);
 
 
-  // Resend all the pending updates right away.
+  // Pause sending updates.
+  // This is useful when the slave is disconnected because a
+  // disconnected slave will drop the updates.
+  void pause();
+
+  // Unpause and resend all the pending updates right away.
   // This is useful when the updates were pending because there was
   // no master elected (e.g., during recovery) or framework failed over.
-  void flush();
+  void resume();
 
   // Closes all the status update streams corresponding to this framework.
   // NOTE: This stops retrying any pending status updates for this framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index ee194ad..fa5eeef 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -488,7 +488,7 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
 
   // Create a status update manager if one wasn't provided.
   if (statusUpdateManager.isNone()) {
-    slave.statusUpdateManager.reset(new slave::StatusUpdateManager());
+    slave.statusUpdateManager.reset(new slave::StatusUpdateManager(flags));
   }
 
   slave.flags = flags;

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 147e23f..bff10d2 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -349,7 +349,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
       containerizer,
       &files,
       &gc,
-      &statusUpdateManager)
+      statusUpdateManager = new slave::StatusUpdateManager(flags))
 {
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _)).
@@ -363,6 +363,12 @@ MockSlave::MockSlave(const slave::Flags& flags,
 }
 
 
+MockSlave::~MockSlave()
+{
+  delete statusUpdateManager;
+}
+
+
 void MockSlave::unmocked_runTask(
     const process::UPID& from,
     const FrameworkInfo& frameworkInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index e40575c..e36e138 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -548,7 +548,7 @@ public:
       MasterDetector* detector,
       slave::Containerizer* containerizer);
 
-  virtual ~MockSlave() {}
+  virtual ~MockSlave();
 
   MOCK_METHOD5(runTask, void(
       const process::UPID& from,
@@ -597,7 +597,7 @@ public:
 private:
   Files files;
   MockGarbageCollector gc;
-  slave::StatusUpdateManager statusUpdateManager;
+  slave::StatusUpdateManager* statusUpdateManager;
 };