You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/17 23:10:39 UTC

[3/5] git commit: Added "per-framework-principal" counters for messages from a scheduler to the master.

Added "per-framework-principal" counters for messages from a scheduler to the master.

Review: https://reviews.apache.org/r/22316


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d19e5880
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d19e5880
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d19e5880

Branch: refs/heads/master
Commit: d19e58802fff4f62e2422cded8bfff902d131b64
Parents: 7d7d94b
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 22:05:22 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                   |   1 +
 src/master/master.cpp             |  72 ++++-
 src/master/master.hpp             |  50 +++
 src/tests/rate_limiting_tests.cpp | 554 +++++++++++++++++++++++++++++++++
 4 files changed, 675 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ae6760..b1b7d2d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -987,6 +987,7 @@ mesos_tests_SOURCES =				\
   tests/monitor_tests.cpp			\
   tests/paths_tests.cpp				\
   tests/protobuf_io_tests.cpp			\
+  tests/rate_limiting_tests.cpp			\
   tests/reconciliation_tests.cpp		\
   tests/registrar_tests.cpp			\
   tests/repair_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a01b1a..fb5c770 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,6 +84,8 @@ using process::Time;
 using process::Timer;
 using process::UPID;
 
+using process::metrics::Counter;
+
 using memory::shared_ptr;
 
 namespace mesos {
@@ -750,6 +752,19 @@ void Master::exited(const UPID& pid)
 
 void Master::visit(const MessageEvent& event)
 {
+  // Increment the "message_received" counter if the message is from
+  // a framework and such a counter is configured for it.
+  // See comments for 'Master::Metrics::Frameworks' and
+  // 'Master::Frameworks::principals' for details.
+  const Option<string> principal =
+    frameworks.principals.get(event.message->from);
+  if (principal.isSome()) {
+    CHECK(metrics.frameworks.contains(principal.get()));
+    Counter messages_received =
+      metrics.frameworks.get(principal.get()).get()->messages_received;
+    ++messages_received;
+  }
+
   // All messages are filtered when non-leading.
   if (!elected()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
@@ -773,6 +788,15 @@ void Master::visit(const MessageEvent& event)
   }
 
   ProtobufProcess<Master>::visit(event);
+
+  // Increment 'messages_processed' counter if it still exists.
+  // Note that it could be removed in handling
+  // 'UnregisterFrameworkMessage'.
+  if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+    Counter messages_processed =
+      metrics.frameworks.get(principal.get()).get()->messages_processed;
+    ++messages_processed;
+  }
 }
 
 
@@ -3567,6 +3591,30 @@ void Master::addFramework(Framework* framework)
 
   allocator->frameworkAdded(
       framework->id, framework->info, framework->resources);
+
+  // Export framework metrics.
+
+  // If the framework is authenticated, its principal should be in
+  // 'authenticated'. Otherwise look if it's supplied in
+  // FrameworkInfo.
+  Option<string> principal = authenticated.get(framework->pid);
+  if (principal.isNone() && framework->info.has_principal()) {
+    principal = framework->info.principal();
+  }
+
+  // Export framework metrics if a principal is specified.
+  if (principal.isSome()) {
+    CHECK(!frameworks.principals.contains(framework->pid));
+    frameworks.principals.put(framework->pid, principal.get());
+
+    // Create new framework metrics if this framework is the first
+    // one of this principal. Otherwise existing metrics are reused.
+    if (!metrics.frameworks.contains(principal.get())) {
+      metrics.frameworks.put(
+          principal.get(),
+          Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
+    }
+  }
 }
 
 
@@ -3574,7 +3622,7 @@ void Master::addFramework(Framework* framework)
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  const UPID& oldPid = framework->pid;
+  const UPID oldPid = framework->pid;
 
   // There are a few failover cases to consider:
   //   1. The pid has changed. In this case we definitely want to
@@ -3622,6 +3670,13 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
         offer->framework_id(), offer->slave_id(), offer->resources());
     removeOffer(offer);
   }
+
+  // 'Failover' the framework's metrics. i.e., change the lookup key
+  // for its metrics to 'newPid'.
+  if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
+    frameworks.principals[newPid] = frameworks.principals[oldPid];
+    frameworks.principals.erase(oldPid);
+  }
 }
 
 
@@ -3702,7 +3757,20 @@ void Master::removeFramework(Framework* framework)
   // Remove the framework from authenticated.
   authenticated.erase(framework->pid);
 
-  // Remove it.
+  // Remove the framework's message counters.
+  const Option<string> principal = frameworks.principals.get(framework->pid);
+  if (principal.isSome()) {
+    frameworks.principals.erase(framework->pid);
+
+    // Remove the metrics for the principal if this framework is the
+    // last one with this principal.
+    if (!frameworks.principals.containsValue(principal.get())) {
+      CHECK(metrics.frameworks.contains(principal.get()));
+      metrics.frameworks.erase(principal.get());
+    }
+  }
+
+  // Remove the framework.
   frameworks.activated.erase(framework->id);
   allocator->frameworkRemoved(framework->id);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7a12185..d55c4f5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -484,6 +484,16 @@ private:
 
     hashmap<FrameworkID, Framework*> activated;
     boost::circular_buffer<memory::shared_ptr<Framework> > completed;
+
+    // Principals of frameworks keyed by PID.
+    // NOTE: Multiple PIDs can map to the same principal. The
+    // differences between this map and 'authenticated' are:
+    // 1) This map only includes *registered* frameworks. The mapping
+    //    is added when a framework (re-)registers.
+    // 2) This map includes unauthenticated frameworks (when Master
+    //    allows them) if they have principals specified in
+    //    FrameworkInfo.
+    hashmap<process::UPID, std::string> principals;
   } frameworks;
 
   hashmap<OfferID, Offer*> offers;
@@ -544,6 +554,46 @@ private:
     // Message counters.
     process::metrics::Counter dropped_messages;
 
+    // Metrics specific to frameworks of a common principal.
+    // These metrics have names prefixed by "frameworks/<principal>/".
+    struct Frameworks
+    {
+      // Counters for messages from all frameworks of this principal.
+      // Note: We only count messages from active scheduler
+      // *instances* while they are *registered*. i.e., messages
+      // prior to the completion of (re)registration
+      // (AuthenticateMessage and (Re)RegisterFrameworkMessage) and
+      // messages from an inactive scheduler instance (after the
+      // framework has failed over) are not counted.
+
+      // Framework messages received (before processing).
+      process::metrics::Counter messages_received;
+
+      // Framework messages processed.
+      // NOTE: This doesn't include dropped messages. Also due to
+      // Master's asynchronous nature, this doesn't necessarily mean
+      // the work requested by this message has finished.
+      process::metrics::Counter messages_processed;
+
+      explicit Frameworks(const std::string& principal)
+        : messages_received("frameworks/" + principal + "/messages_received"),
+          messages_processed("frameworks/" + principal + "/messages_processed")
+      {
+        process::metrics::add(messages_received);
+        process::metrics::add(messages_processed);
+      }
+
+      ~Frameworks()
+      {
+        process::metrics::remove(messages_received);
+        process::metrics::remove(messages_processed);
+      }
+    };
+
+    // Per-framework-principal metrics keyed by the framework
+    // principal.
+    hashmap<std::string, process::Owned<Frameworks> > frameworks;
+
     // Messages from schedulers.
     process::metrics::Counter messages_register_framework;
     process::metrics::Counter messages_reregister_framework;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
new file mode 100644
index 0000000..baf912e
--- /dev/null
+++ b/src/tests/rate_limiting_tests.cpp
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include "master/allocator.hpp"
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::master::allocator::AllocatorProcess;
+
+using process::metrics::internal::MetricsProcess;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtLeast;
+using testing::Eq;
+using testing::Return;
+
+// This test case covers tests related to framework API rate limiting
+// which includes metrics exporting for API call rates.
+class RateLimitingTest : public MesosTest {};
+
+
+// Verify that message counters for a framework are added when a
+// framework registers and removed when it terminates.
+TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Message counters not present before the framework registers.
+  {
+    // TODO(xujyan): It would be nice to refactor out the common
+    // metrics snapshot querying logic into a helper method/MACRO.
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  MockScheduler sched;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+  AWAIT_READY(registered);
+
+  // Message counters added after the framework is registered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver->stop();
+  driver->join();
+  delete driver;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // Message counter removed after the framework is unregistered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Shutdown();
+}
+
+
+// Verify that framework message counters work with frameworks of
+// different principals.
+TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+{
+  // 1. Register two frameworks.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_principal("framework1");
+
+  MockScheduler sched1;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver1 =
+    new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched1, registered(driver1, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+    AWAIT_READY(registered);
+  }
+
+  FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_principal("framework2");
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched2, registered(&driver2, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+    AWAIT_READY(registered);
+  }
+
+  // 2. Verify that both frameworks have message counters added.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework1/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework1/messages_processed"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_processed"));
+  }
+
+  // 3. Remove a framework.
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver1->stop();
+  driver1->join();
+  delete driver1;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // 4. Its message counters are deleted while the other framework's
+  //    counters stay.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/framework1/messages_received"));
+    EXPECT_EQ(
+        0u, metrics.values.count("frameworks/framework1/messages_processed"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/framework2/messages_processed"));
+  }
+
+  driver2.stop();
+  driver2.join();
+
+  Shutdown();
+}
+
+
+// Verify that if multiple frameworks use the same principal, they
+// share the same counters and removing one framework doesn't remove
+// the counters.
+TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockScheduler sched1;
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched1, registered(driver1, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+    AWAIT_READY(registered);
+  }
+
+  // 'sched2' uses the same principal "test-principal".
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  {
+    Future<Nothing> registered;
+    EXPECT_CALL(sched2, registered(&driver2, _, _))
+      .WillOnce(FutureSatisfy(&registered));
+
+    ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+    AWAIT_READY(registered);
+  }
+
+  // Message counters added after both frameworks are registered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  driver1->stop();
+  driver1->join();
+  delete driver1;
+
+  AWAIT_READY(frameworkRemoved);
+
+  // Message counters are not removed after the first framework is
+  // unregistered.
+  {
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_received"));
+    EXPECT_EQ(
+        1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+                                 "/messages_processed"));
+  }
+
+  driver2.stop();
+  driver2.join();
+
+  Shutdown();
+}
+
+
+// Verify that when a scheduler fails over, the new scheduler
+// instance continues to use the same counters.
+TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // 1. Launch the first (i.e., failing) scheduler and verify its
+  // counters.
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  {
+    // Grab this message so we can resend it.
+    Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+        RegisterFrameworkMessage(), _, master.get());
+
+    // Grab this message to get the scheduler's pid.
+    Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+        Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+    driver1.start();
+
+    AWAIT_READY(registerFrameworkMessage);
+    AWAIT_READY(frameworkRegisteredMessage);
+    AWAIT_READY(frameworkId);
+
+    const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+    // Send a duplicate RegisterFrameworkMessage. Master replies
+    // with a duplicate FrameworkRegisteredMessage.
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+    // Now one message has been received and processed by Master in
+    // addition to the RegisterFrameworkMessage.
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Settle to make sure message_processed counters are updated.
+    Clock::pause();
+    Clock::settle();
+    Clock::resume();
+
+    // Verify the message counters.
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    // One message received and processed after the framework is
+    // registered.
+    const string& messages_received =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  // 2. Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and verify that
+  // its counters are not reset to zero.
+
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId.get());
+
+  MesosSchedulerDriver driver2(
+      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+  // Scheduler driver ignores duplicate FrameworkRegisteredMessages.
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _))
+    .Times(1);
+
+  Future<Nothing> sched1Error;
+  EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+    .WillOnce(FutureSatisfy(&sched1Error));
+
+  {
+    // Grab this message to get the scheduler's pid and to make sure we
+    // wait until the framework is registered.
+    Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+        Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+    // Grab this message so we can resend it.
+    Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+      FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+    driver2.start();
+
+    AWAIT_READY(reregisterFrameworkMessage);
+    AWAIT_READY(sched1Error);
+    AWAIT_READY(frameworkRegisteredMessage);
+
+    const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    // Sending a duplicate ReregisterFrameworkMessage to test the
+    // message counters with the new scheduler instance.
+    process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Settle to make sure message_processed counters are updated.
+    Clock::pause();
+    Clock::settle();
+    Clock::resume();
+
+    Future<process::http::Response> response =
+      process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    AWAIT_READY(response);
+
+    EXPECT_SOME_EQ(
+        "application/json",
+        response.get().headers.get("Content-Type"));
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+    ASSERT_SOME(parse);
+
+    JSON::Object metrics = parse.get();
+
+    // Another message after sched2 is reregistered plus the one from
+    // the sched1.
+    const string& messages_received =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+
+  EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+  EXPECT_EQ(DRIVER_STOPPED, driver1.join());
+
+  Shutdown();
+}