You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/17 23:10:37 UTC

[1/5] git commit: Implemented framework API rate limiting.

Repository: mesos
Updated Branches:
  refs/heads/master 7d7d94b1d -> a99de1a98


Implemented framework API rate limiting.

Review: https://reviews.apache.org/r/22427


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a99de1a9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a99de1a9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a99de1a9

Branch: refs/heads/master
Commit: a99de1a985316aa00242d65b6f85200fdefe6a90
Parents: ebdb81e
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 14:54:11 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp             | 92 +++++++++++++++++++++++++++++++---
 src/master/master.hpp             | 23 +++++++--
 src/tests/rate_limiting_tests.cpp | 88 +++++++++++++++++++++++++++++++-
 3 files changed, 191 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c7357da..888657d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -30,6 +30,7 @@
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
+#include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/run.hpp>
 
@@ -74,6 +75,7 @@ using std::vector;
 using process::await;
 using process::wait; // Necessary on some OS's to disambiguate.
 using process::Clock;
+using process::ExitedEvent;
 using process::Failure;
 using process::Future;
 using process::MessageEvent;
@@ -81,6 +83,7 @@ using process::Owned;
 using process::PID;
 using process::Process;
 using process::Promise;
+using process::RateLimiter;
 using process::Time;
 using process::Timer;
 using process::UPID;
@@ -352,14 +355,32 @@ void Master::initialize()
   }
 
   if (flags.rate_limits.isSome()) {
-    Try<RateLimits> limits_ =
+    Try<RateLimits> limits =
       ::protobuf::parse<RateLimits>(flags.rate_limits.get());
-    if (limits_.isError()) {
-      EXIT(1) << "Invalid RateLimits format: " << limits_.error()
+    if (limits.isError()) {
+      EXIT(1) << "Invalid RateLimits format: " << limits.error()
               << " (see --rate_limits flag)";
     }
-    limits = limits_.get();
-    LOG(INFO) << "Framework rate limiting enabled";
+
+    // Add framework rate limiters.
+    foreach (const RateLimit& limit_, limits.get().limits()) {
+      if (limiters.contains(limit_.principal())) {
+        EXIT(1) << "Duplicate principal " << limit_.principal()
+                << " found in RateLimits configuration";
+      }
+
+      if (limit_.has_qps() && limit_.qps() <= 0) {
+        EXIT(1) << "Invalid qps: " << limit_.qps()
+                << ". It must be a positive number";
+      }
+
+      limiters.put(
+          limit_.principal(),
+          limit_.has_qps()
+            ? Option<Owned<RateLimiter> >::some(
+                Owned<RateLimiter>(new RateLimiter(limit_.qps())))
+            : Option<Owned<RateLimiter> >::none());
+    }
   }
 
   hashmap<string, RoleInfo> roleInfos;
@@ -799,11 +820,64 @@ void Master::visit(const MessageEvent& event)
     return;
   }
 
+  // Throttle the message if it's a framework message and a
+  // RateLimiter is configured for the framework's principal.
+  // The framework is not throttled if:
+  // 1) the principal is not specified by the framework (or)
+  // 2) the principal doesn't exist in RateLimits configuration (or)
+  // 3) the principal exists in RateLimits but 'qps' is not set.
+  if (principal.isSome() &&
+      limiters.contains(principal.get()) &&
+      limiters[principal.get()].isSome()) {
+    // Necessary to disambiguate.
+    typedef void(Self::*F)(const MessageEvent&);
+
+    limiters[principal.get()].get()->acquire()
+      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+  } else {
+    _visit(event);
+  }
+}
+
+
+void Master::visit(const process::ExitedEvent& event)
+{
+  // Throttle the message if it's a framework message and a
+  // RateLimiter is configured for the framework's principal.
+  // Note that throttling ExitedEvent is necessary so the order
+  // between MessageEvents and ExitedEvents from the same PID is
+  // maintained.
+  // See comments in 'visit(const MessageEvent& event)' for when a
+  // message is not throttled.
+  const Option<string> principal = frameworks.principals.get(event.pid);
+  if (principal.isSome() &&
+      limiters.contains(principal.get()) &&
+      limiters[principal.get()].isSome()) {
+    // Necessary to disambiguate.
+    typedef void(Self::*F)(const ExitedEvent&);
+
+    limiters[principal.get()].get()->acquire()
+      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+  } else {
+    _visit(event);
+  }
+}
+
+
+void Master::_visit(const MessageEvent& event)
+{
+  // Obtain the principal before processing the Message because the
+  // mapping may be deleted in handling 'UnregisterFrameworkMessage'
+  // but its counter still needs to be incremented for this message.
+  const Option<string> principal =
+    frameworks.principals.get(event.message->from);
+
   ProtobufProcess<Master>::visit(event);
 
   // Increment 'messages_processed' counter if it still exists.
   // Note that it could be removed in handling
-  // 'UnregisterFrameworkMessage'.
+  // 'UnregisterFrameworkMessage' if it's the last framework with
+  // this principal.
   if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
     Counter messages_processed =
       metrics.frameworks.get(principal.get()).get()->messages_processed;
@@ -812,6 +886,12 @@ void Master::visit(const MessageEvent& event)
 }
 
 
+void Master::_visit(const ExitedEvent& event)
+{
+  Process<Master>::visit(event);
+}
+
+
 void fail(const string& message, const string& failure)
 {
   LOG(FATAL) << message << ": " << failure;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d682613..2844446 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -59,6 +59,9 @@
 
 #include "messages/messages.hpp"
 
+namespace process {
+class RateLimiter; // Forward declaration.
+}
 
 namespace mesos {
 namespace internal {
@@ -235,6 +238,11 @@ protected:
   virtual void finalize();
   virtual void exited(const process::UPID& pid);
   virtual void visit(const process::MessageEvent& event);
+  virtual void visit(const process::ExitedEvent& event);
+
+  // Continuations of visit().
+  void _visit(const process::MessageEvent& event);
+  void _visit(const process::ExitedEvent& event);
 
   // Recovers state from the registrar.
   process::Future<Nothing> recover();
@@ -510,8 +518,6 @@ private:
   // Principals of authenticated frameworks/slaves keyed by PID.
   hashmap<process::UPID, std::string> authenticated;
 
-  Option<RateLimits> limits;
-
   int64_t nextFrameworkId; // Used to give each framework a unique ID.
   int64_t nextOfferId;     // Used to give each slot offer a unique ID.
   int64_t nextSlaveId;     // Used to give each slave a unique ID.
@@ -572,9 +578,11 @@ private:
       process::metrics::Counter messages_received;
 
       // Framework messages processed.
-      // NOTE: This doesn't include dropped messages. Also due to
-      // Master's asynchronous nature, this doesn't necessarily mean
-      // the work requested by this message has finished.
+      // NOTE: This doesn't include dropped messages. Processing of
+      // a message may be throttled by a RateLimiter if one is
+      // configured for this principal. Also due to Master's
+      // asynchronous nature, this doesn't necessarily mean the work
+      // requested by this message has finished.
       process::metrics::Counter messages_processed;
 
       explicit Frameworks(const std::string& principal)
@@ -707,6 +715,11 @@ private:
   process::Future<Option<Error> > validate(
       const FrameworkInfo& frameworkInfo,
       const process::UPID& from);
+
+  // RateLimiters keyed by the framework principal.
+  // Like Metrics::Frameworks, all frameworks of the same principal
+  // are throttled together at a common rate limit.
+  hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a99de1a9/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index baf912e..9a54461 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -48,7 +48,6 @@ using process::PID;
 using std::string;
 
 using testing::_;
-using testing::AtLeast;
 using testing::Eq;
 using testing::Return;
 
@@ -552,3 +551,90 @@ TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
 
   Shutdown();
 }
+
+
+// Verify that a framework is being correctly throttled at the
+// configured rate.
+TEST_F(RateLimitingTest, ThrottleFramework)
+{
+  // Throttle at 1 QPS.
+  RateLimits limits;
+  RateLimit* limit = limits.mutable_limits()->Add();
+  limit->set_principal(DEFAULT_CREDENTIAL.principal());
+  limit->set_qps(1);
+  master::Flags flags = CreateMasterFlags();
+
+  flags.rate_limits = JSON::Protobuf(limits);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  // Grab this message so we can resend it.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+
+  // Grab this message to get the scheduler's pid and to make sure we
+  // wait until the framework is registered.
+  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+  ASSERT_EQ(DRIVER_RUNNING, driver.start());
+
+  AWAIT_READY(registerFrameworkMessage);
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+  Clock::pause();
+
+  // Keep sending duplicate RegisterFrameworkMessages. Master sends
+  // FrameworkRegisteredMessage back after processing each of them.
+  {
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+    Clock::settle();
+
+    // The first message is not throttled because it's at the head of
+    // the queue.
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+  }
+
+  {
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+    Clock::settle();
+
+    // The 2nd message is throttled for a second.
+    Clock::advance(Milliseconds(500));
+    Clock::settle();
+
+    EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending());
+
+    Clock::advance(Milliseconds(501));
+    Clock::settle();
+
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+  }
+
+  Clock::resume();
+
+  EXPECT_EQ(DRIVER_STOPPED, driver.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver.join());
+
+  Shutdown();
+}


[5/5] git commit: Changed RateLimiter to work directly with 'double permitsPerSecond'.

Posted by ya...@apache.org.
Changed RateLimiter to work directly with 'double permitsPerSecond'.

Review: https://reviews.apache.org/r/22424


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a0b3490
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a0b3490
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a0b3490

Branch: refs/heads/master
Commit: 4a0b34907bdc83f46b3cf61d7578b6939c296834
Parents: d19e588
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 14:42:56 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/limiter.hpp | 30 ++++++++++++++------
 1 file changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4a0b3490/3rdparty/libprocess/include/process/limiter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/limiter.hpp b/3rdparty/libprocess/include/process/limiter.hpp
index bbe8226..846ec09 100644
--- a/3rdparty/libprocess/include/process/limiter.hpp
+++ b/3rdparty/libprocess/include/process/limiter.hpp
@@ -5,6 +5,7 @@
 
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/id.hpp>
 #include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/timeout.hpp>
@@ -29,6 +30,7 @@ class RateLimiter
 {
 public:
   RateLimiter(int permits, const Duration& duration);
+  explicit RateLimiter(double permitsPerSecond);
   ~RateLimiter();
 
   // Returns a future that becomes ready when the permit is acquired.
@@ -46,11 +48,19 @@ private:
 class RateLimiterProcess : public Process<RateLimiterProcess>
 {
 public:
-  RateLimiterProcess(int _permits, const Duration& _duration)
-    : permits(_permits), duration(_duration)
+  RateLimiterProcess(int permits, const Duration& duration)
+    : ProcessBase(ID::generate("__limiter__"))
   {
     CHECK_GT(permits, 0);
     CHECK_GT(duration.secs(), 0);
+    permitsPerSecond = permits / duration.secs();
+  }
+
+  explicit RateLimiterProcess(double _permitsPerSecond)
+    : ProcessBase(ID::generate("__limiter__")),
+      permitsPerSecond(_permitsPerSecond)
+  {
+    CHECK_GT(permitsPerSecond, 0);
   }
 
   virtual void finalize()
@@ -78,8 +88,7 @@ public:
     }
 
     // No need to wait!
-    double rate = permits / duration.secs();
-    timeout = Seconds(1) / rate;
+    timeout = Seconds(1) / permitsPerSecond;
     return Nothing();
   }
 
@@ -97,8 +106,7 @@ private:
 
     promise->set(Nothing());
 
-    double rate = permits / duration.secs();
-    timeout = Seconds(1) / rate;
+    timeout = Seconds(1) / permitsPerSecond;
 
     // Repeat if necessary.
     if (!promises.empty()) {
@@ -106,8 +114,7 @@ private:
     }
   }
 
-  const int permits;
-  const Duration duration;
+  double permitsPerSecond;
 
   Timeout timeout;
 
@@ -122,6 +129,13 @@ inline RateLimiter::RateLimiter(int permits, const Duration& duration)
 }
 
 
+inline RateLimiter::RateLimiter(double permitsPerSecond)
+{
+  process = new RateLimiterProcess(permitsPerSecond);
+  spawn(process);
+}
+
+
 inline RateLimiter::~RateLimiter()
 {
   terminate(process);


[2/5] git commit: Created framework rate limits protobuf object which is loaded as JSON through master flags.

Posted by ya...@apache.org.
Created framework rate limits protobuf object which is loaded as JSON through master flags.

Review: https://reviews.apache.org/r/22425


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f9ffddf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f9ffddf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f9ffddf

Branch: refs/heads/master
Commit: 2f9ffddf98f544b778fdeee50a1ae09ec0e7c786
Parents: 4a0b349
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 16 17:21:24 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto | 30 ++++++++++++++++++++++++++++++
 src/master/flags.hpp      | 24 ++++++++++++++++++++++++
 src/master/master.cpp     | 12 ++++++++++++
 src/master/master.hpp     |  2 ++
 4 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 709b8b1..2f6be05 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -746,3 +746,33 @@ message ACLs {
   repeated ACL.HTTPGet http_get = 4;
   repeated ACL.HTTPPut http_put = 5;
 }
+
+
+/**
+ * Rate (queries per second, QPS) limit for messages from a framework to master.
+ * Strictly speaking they are the combined rate from all frameworks of the same
+ * principal.
+ */
+message RateLimit {
+  // Leaving QPS unset gives it unlimited rate (i.e., not throttled).
+  optional double qps = 1;
+
+  // Principal of framework(s) to be throttled. Should match
+  // FrameworkInfo.princpal and Credential.principal (if using authentication).
+  required string principal = 2;
+}
+
+
+/**
+ * Collection of RateLimit.
+ * Frameworks without rate limits defined here are not throttled.
+ * TODO(xujyan): Currently when a framework is not specified in 'limits' it is
+ * not throttled. This can be done more explicitly by adding a RateLimit entry
+ * without setting its 'qps'. We should consider adding an optional
+ * 'aggregate_default_qps' which can be used to throttle the frameworks not
+ * specified here.
+ */
+message RateLimits {
+  // Items should have unique principals.
+  repeated RateLimit limits = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 7850e45..47bb0dc 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -237,6 +237,29 @@ public:
         "                       }\n"
         "                     ]\n"
         "}");
+
+    add(&Flags::rate_limits,
+        "rate_limits",
+        "The value could be a JSON formatted string of rate limits\n"
+        "or a file path containing the JSON formatted rate limits used\n"
+        "for framework rate limiting.\n"
+        "Path could be of the form 'file:///path/to/file'\n"
+        "or '/path/to/file'.\n"
+        "\n"
+        "See the RateLimits protobuf in mesos.proto for the expected format.\n"
+        "\n"
+        "Example:\n"
+        "{\n"
+        "  \"limits\": [\n"
+        "    {\n"
+        "      \"principal\": \"foo\",\n"
+        "      \"qps\": 55.5\n"
+        "    },\n"
+        "    {\n"
+        "      \"principal\": \"bar\"\n"
+        "    }\n"
+        "  ]\n"
+        "}");
   }
 
   bool version;
@@ -264,6 +287,7 @@ public:
   bool authenticate_slaves;
   Option<std::string> credentials;
   Option<ACLs> acls;
+  Option<JSON::Object> rate_limits;
 };
 
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index fb5c770..c7357da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -45,6 +45,7 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/protobuf.hpp>
 #include <stout/stringify.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -350,6 +351,17 @@ void Master::initialize()
     LOG(INFO) << "Authorization enabled";
   }
 
+  if (flags.rate_limits.isSome()) {
+    Try<RateLimits> limits_ =
+      ::protobuf::parse<RateLimits>(flags.rate_limits.get());
+    if (limits_.isError()) {
+      EXIT(1) << "Invalid RateLimits format: " << limits_.error()
+              << " (see --rate_limits flag)";
+    }
+    limits = limits_.get();
+    LOG(INFO) << "Framework rate limiting enabled";
+  }
+
   hashmap<string, RoleInfo> roleInfos;
 
   // Add the default role.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f9ffddf/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d55c4f5..d682613 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -510,6 +510,8 @@ private:
   // Principals of authenticated frameworks/slaves keyed by PID.
   hashmap<process::UPID, std::string> authenticated;
 
+  Option<RateLimits> limits;
+
   int64_t nextFrameworkId; // Used to give each framework a unique ID.
   int64_t nextOfferId;     // Used to give each slot offer a unique ID.
   int64_t nextSlaveId;     // Used to give each slave a unique ID.


[4/5] git commit: Changed MessageEvent and ExitedEvent to be copyable.

Posted by ya...@apache.org.
Changed MessageEvent and ExitedEvent to be copyable.

Review: https://reviews.apache.org/r/22426


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ebdb81ea
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ebdb81ea
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ebdb81ea

Branch: refs/heads/master
Commit: ebdb81ea99efc8fab0a50fb42b5b7a140787bc86
Parents: 2f9ffdd
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Sat Jun 7 21:10:55 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ebdb81ea/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index e30a4d4..3c860f4 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -76,6 +76,9 @@ struct MessageEvent : Event
   explicit MessageEvent(Message* _message)
     : message(_message) {}
 
+  MessageEvent(const MessageEvent& that)
+    : message(that.message == NULL ? NULL : new Message(*that.message)) {}
+
   virtual ~MessageEvent()
   {
     delete message;
@@ -89,8 +92,10 @@ struct MessageEvent : Event
   Message* const message;
 
 private:
-  // Not copyable, not assignable.
-  MessageEvent(const MessageEvent&);
+  // Keep MessageEvent not assignable even though we made it
+  // copyable.
+  // Note that we are violating the "rule of three" here but it helps
+  // keep the fields const.
   MessageEvent& operator = (const MessageEvent&);
 };
 
@@ -170,8 +175,9 @@ struct ExitedEvent : Event
   const UPID pid;
 
 private:
-  // Not copyable, not assignable.
-  ExitedEvent(const ExitedEvent&);
+  // Keep ExitedEvent not assignable even though we made it copyable.
+  // Note that we are violating the "rule of three" here but it helps
+  // keep the fields const.
   ExitedEvent& operator = (const ExitedEvent&);
 };
 


[3/5] git commit: Added "per-framework-principal" counters for messages from a scheduler to the master.

Posted by ya...@apache.org.
Added "per-framework-principal" counters for messages from a scheduler to the master.

Review: https://reviews.apache.org/r/22316


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d19e5880
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d19e5880
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d19e5880

Branch: refs/heads/master
Commit: d19e58802fff4f62e2422cded8bfff902d131b64
Parents: 7d7d94b
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 22:05:22 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                   |   1 +
 src/master/master.cpp             |  72 ++++-
 src/master/master.hpp             |  50 +++
 src/tests/rate_limiting_tests.cpp | 554 +++++++++++++++++++++++++++++++++
 4 files changed, 675 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ae6760..b1b7d2d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -987,6 +987,7 @@ mesos_tests_SOURCES =				\
   tests/monitor_tests.cpp			\
   tests/paths_tests.cpp				\
   tests/protobuf_io_tests.cpp			\
+  tests/rate_limiting_tests.cpp			\
   tests/reconciliation_tests.cpp		\
   tests/registrar_tests.cpp			\
   tests/repair_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a01b1a..fb5c770 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,6 +84,8 @@ using process::Time;
 using process::Timer;
 using process::UPID;
 
+using process::metrics::Counter;
+
 using memory::shared_ptr;
 
 namespace mesos {
@@ -750,6 +752,19 @@ void Master::exited(const UPID& pid)
 
 void Master::visit(const MessageEvent& event)
 {
+  // Increment the "message_received" counter if the message is from
+  // a framework and such a counter is configured for it.
+  // See comments for 'Master::Metrics::Frameworks' and
+  // 'Master::Frameworks::principals' for details.
+  const Option<string> principal =
+    frameworks.principals.get(event.message->from);
+  if (principal.isSome()) {
+    CHECK(metrics.frameworks.contains(principal.get()));
+    Counter messages_received =
+      metrics.frameworks.get(principal.get()).get()->messages_received;
+    ++messages_received;
+  }
+
   // All messages are filtered when non-leading.
   if (!elected()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
@@ -773,6 +788,15 @@ void Master::visit(const MessageEvent& event)
   }
 
   ProtobufProcess<Master>::visit(event);
+
+  // Increment 'messages_processed' counter if it still exists.
+  // Note that it could be removed in handling
+  // 'UnregisterFrameworkMessage'.
+  if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+    Counter messages_processed =
+      metrics.frameworks.get(principal.get()).get()->messages_processed;
+    ++messages_processed;
+  }
 }
 
 
@@ -3567,6 +3591,30 @@ void Master::addFramework(Framework* framework)
 
   allocator->frameworkAdded(
       framework->id, framework->info, framework->resources);
+
+  // Export framework metrics.
+
+  // If the framework is authenticated, its principal should be in
+  // 'authenticated'. Otherwise look if it's supplied in
+  // FrameworkInfo.
+  Option<string> principal = authenticated.get(framework->pid);
+  if (principal.isNone() && framework->info.has_principal()) {
+    principal = framework->info.principal();
+  }
+
+  // Export framework metrics if a principal is specified.
+  if (principal.isSome()) {
+    CHECK(!frameworks.principals.contains(framework->pid));
+    frameworks.principals.put(framework->pid, principal.get());
+
+    // Create new framework metrics if this framework is the first
+    // one of this principal. Otherwise existing metrics are reused.
+    if (!metrics.frameworks.contains(principal.get())) {
+      metrics.frameworks.put(
+          principal.get(),
+          Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
+    }
+  }
 }
 
 
@@ -3574,7 +3622,7 @@ void Master::addFramework(Framework* framework)
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  const UPID& oldPid = framework->pid;
+  const UPID oldPid = framework->pid;
 
   // There are a few failover cases to consider:
   //   1. The pid has changed. In this case we definitely want to
@@ -3622,6 +3670,13 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
         offer->framework_id(), offer->slave_id(), offer->resources());
     removeOffer(offer);
   }
+
+  // 'Failover' the framework's metrics. i.e., change the lookup key
+  // for its metrics to 'newPid'.
+  if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
+    frameworks.principals[newPid] = frameworks.principals[oldPid];
+    frameworks.principals.erase(oldPid);
+  }
 }
 
 
@@ -3702,7 +3757,20 @@ void Master::removeFramework(Framework* framework)
   // Remove the framework from authenticated.
   authenticated.erase(framework->pid);
 
-  // Remove it.
+  // Remove the framework's message counters.
+  const Option<string> principal = frameworks.principals.get(framework->pid);
+  if (principal.isSome()) {
+    frameworks.principals.erase(framework->pid);
+
+    // Remove the metrics for the principal if this framework is the
+    // last one with this principal.
+    if (!frameworks.principals.containsValue(principal.get())) {
+      CHECK(metrics.frameworks.contains(principal.get()));
+      metrics.frameworks.erase(principal.get());
+    }
+  }
+
+  // Remove the framework.
   frameworks.activated.erase(framework->id);
   allocator->frameworkRemoved(framework->id);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7a12185..d55c4f5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -484,6 +484,16 @@ private:
 
     hashmap<FrameworkID, Framework*> activated;
     boost::circular_buffer<memory::shared_ptr<Framework> > completed;
+
+    // Principals of frameworks keyed by PID.
+    // NOTE: Multiple PIDs can map to the same principal. The
+    // differences between this map and 'authenticated' are:
+    // 1) This map only includes *registered* frameworks. The mapping
+    //    is added when a framework (re-)registers.
+    // 2) This map includes unauthenticated frameworks (when Master
+    //    allows them) if they have principals specified in
+    //    FrameworkInfo.
+    hashmap<process::UPID, std::string> principals;
   } frameworks;
 
   hashmap<OfferID, Offer*> offers;
@@ -544,6 +554,46 @@ private:
     // Message counters.
     process::metrics::Counter dropped_messages;
 
+    // Metrics specific to frameworks of a common principal.
+    // These metrics have names prefixed by "frameworks/<principal>/".
+    struct Frameworks
+    {
+      // Counters for messages from all frameworks of this principal.
+      // Note: We only count messages from active scheduler
+      // *instances* while they are *registered*. i.e., messages
+      // prior to the completion of (re)registration
+      // (AuthenticateMessage and (Re)RegisterFrameworkMessage) and
+      // messages from an inactive scheduler instance (after the
+      // framework has failed over) are not counted.
+
+      // Framework messages received (before processing).
+      process::metrics::Counter messages_received;
+
+      // Framework messages processed.
+      // NOTE: This doesn't include dropped messages. Also due to
+      // Master's asynchronous nature, this doesn't necessarily mean
+      // the work requested by this message has finished.
+      process::metrics::Counter messages_processed;
+
+      explicit Frameworks(const std::string& principal)
+        : messages_received("frameworks/" + principal + "/messages_received"),
+          messages_processed("frameworks/" + principal + "/messages_processed")
+      {
+        process::metrics::add(messages_received);
+        process::metrics::add(messages_processed);
+      }
+
+      ~Frameworks()
+      {
+        process::metrics::remove(messages_received);
+        process::metrics::remove(messages_processed);
+      }
+    };
+
+    // Per-framework-principal metrics keyed by the framework
+    // principal.
+    hashmap<std::string, process::Owned<Frameworks> > frameworks;
+
     // Messages from schedulers.
     process::metrics::Counter messages_register_framework;
     process::metrics::Counter messages_reregister_framework;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
new file mode 100644
index 0000000..baf912e
--- /dev/null
+++ b/src/tests/rate_limiting_tests.cpp
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include "master/allocator.hpp"
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::master::allocator::AllocatorProcess;
+
+using process::metrics::internal::MetricsProcess;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtLeast;
+using testing::Eq;
+using testing::Return;
+
+// This test case covers tests related to framework API rate limiting
+// which includes metrics exporting for API call rates.
+class RateLimitingTest : public MesosTest {};
+
+
+// Verify that message counters for a framework are added when a
+// framework registers and removed when it terminates.
+TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Message counters not present before the framework registers.
+  {
+    // TODO(xujyan): It would be nice to refactor out the common
+    // metrics snapshot querying logic into a helper method/MACRO.
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  MockScheduler sched;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+  AWAIT_READY(registered);
+
+  // Message counters added after the framework is registered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver->stop();
+  driver->join();
+  delete driver;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // Message counter removed after the framework is unregistered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Shutdown();
+}
+
+
+// Verify that framework message counters work with frameworks of
+// different principals.
+TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+{
+  // 1. Register two frameworks.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_principal("framework1");
+
+  MockScheduler sched1;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver1 =
+    new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched1, registered(driver1, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+    AWAIT_READY(registered);
+  }
+
+  FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_principal("framework2");
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched2, registered(&driver2, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+    AWAIT_READY(registered);
+  }
+
+  // 2. Verify that both frameworks have message counters added.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework1/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework1/messages_processed"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_processed"));
+  }
+
+  // 3. Remove a framework.
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver1->stop();
+  driver1->join();
+  delete driver1;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // 4. Its message counters are deleted while the other framework's
+  //    counters stay.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/framework1/messages_received"));
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/framework1/messages_processed"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_processed"));
+  }
+
+  driver2.stop();
+  driver2.join();
+
+  Shutdown();
+}
+
+
+// Verify that if multiple frameworks use the same principal, they
+// share the same counters and removing one framework doesn't remove
+// the counters.
+TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockScheduler sched1;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched1, registered(driver1, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+    AWAIT_READY(registered);
+  }
+
+  // 'sched2' uses the same principal "test-principal".
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched2, registered(&driver2, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+    AWAIT_READY(registered);
+  }
+
+  // Message counters added after both frameworks are registered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver1->stop();
+  driver1->join();
+  delete driver1;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // Message counters are not removed after the first framework is
+  // unregistered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  driver2.stop();
+  driver2.join();
+
+  Shutdown();
+}
+
+
+// Verify that when a scheduler fails over, the new scheduler
+// instance continues to use the same counters.
+TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // 1. Launch the first (i.e., failing) scheduler and verify its
+  // counters.
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  {
+    // Grab this message so we can resend it.
+    Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+        RegisterFrameworkMessage(), _, master.get());
+
+    // Grab this message to get the scheduler's pid.
+    Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+        Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+    driver1.start();
+
+    AWAIT_READY(registerFrameworkMessage);
+    AWAIT_READY(frameworkRegisteredMessage);
+    AWAIT_READY(frameworkId);
+
+    const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+    // Send a duplicate RegisterFrameworkMessage. Master replies
+    // with a duplicate FrameworkRegisteredMessage.
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+    // Now one message has been received and processed by Master in
+    // addition to the RegisterFrameworkMessage.
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Settle to make sure message_processed counters are updated.
+    Clock::pause();
+    Clock::settle();
+    Clock::resume();
+
+    // Verify the message counters.
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    // One message received and processed after the framework is
+    // registered.
+    const string& messages_received =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  // 2. Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and verify that
+  // its counters are not reset to zero.
+
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId.get());
+
+  MesosSchedulerDriver driver2(
+      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+  // Scheduler driver ignores duplicate FrameworkRegisteredMessages.
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _))
+    .Times(1);
+
+  Future<Nothing> sched1Error;
+  EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+    .WillOnce(FutureSatisfy(&sched1Error));
+
+  {
+    // Grab this message to get the scheduler's pid and to make sure we
+    // wait until the framework is registered.
+    Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+        Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+    // Grab this message so we can resend it.
+    Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+      FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+    driver2.start();
+
+    AWAIT_READY(reregisterFrameworkMessage);
+    AWAIT_READY(sched1Error);
+    AWAIT_READY(frameworkRegisteredMessage);
+
+    const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    // Sending a duplicate ReregisterFrameworkMessage to test the
+    // message counters with the new scheduler instance.
+    process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Settle to make sure message_processed counters are updated.
+    Clock::pause();
+    Clock::settle();
+    Clock::resume();
+
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    // Another message after sched2 is reregistered plus the one from
+    // the sched1.
+    const string& messages_received =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+
+  EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver1.join());
+
+  Shutdown();
+}