You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/08/07 09:14:26 UTC

[1/2] git commit: Added 'timed_tests.sh' script to help investigate the cause of hanging tests.

Repository: mesos
Updated Branches:
  refs/heads/tmp [created] 4a831c4cb


Added 'timed_tests.sh' script to help investigate the cause of hanging tests.

Review: https://reviews.apache.org/r/23700


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5fcb436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5fcb436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5fcb436

Branch: refs/heads/tmp
Commit: e5fcb436a2293ae5ac5b90cd382b591c62d3a2d7
Parents: 6208631
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jul 18 12:57:46 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Thu Aug 7 00:10:51 2014 -0700

----------------------------------------------------------------------
 support/timed_tests.sh | 113 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e5fcb436/support/timed_tests.sh
----------------------------------------------------------------------
diff --git a/support/timed_tests.sh b/support/timed_tests.sh
new file mode 100755
index 0000000..0b44716
--- /dev/null
+++ b/support/timed_tests.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/setsid bash
+
+function usage {
+cat <<EOF
+
+Run Mesos tests within a duration and attach gdb if the tests time out. This
+script is expected to be run under the repository's root directory.
+
+Usage: $0 [-h] [-m <make_cmd>] <test_cmd> <duration>
+
+  test_cmd   Command that runs the tests.
+             e.g., MESOS_VERBOSE=1 make check GTEST_SHUFFLE=1
+  duration   Duration (in seconds) before the tests time out.
+             e.g., 3600, \$((160 * 60))
+  make_cmd   Optional command which is excuted before 'test_cmd'.
+             e.g., make check GTEST_FILTER=
+  -h         Print this help message and exit
+EOF
+}
+
+die () {
+  echo >&2 "$@"
+  exit 1
+}
+
+while getopts ":hm:" opt; do
+  case "$opt" in
+    m)
+      make_cmd="${OPTARG}"
+      ;;
+    h)
+      usage
+      exit 0
+      ;;
+    *)
+      echo "Unknown option: -$OPTARG"
+      usage
+      exit 1
+      ;;
+  esac
+done
+
+shift $(($OPTIND - 1))
+if test ${#} -ne 2; then
+  usage
+  exit 1
+fi
+
+test_cmd=$1
+duration=$2
+
+echo "This script runs with session id $$ and can be terminated by: pkill -s $$"
+
+if [ "$make_cmd" ]; then
+  ./bootstrap
+
+  mkdir -p build && pushd build
+
+  ../configure
+
+  echo -n `date`; echo ": start running $make_cmd"
+
+  eval "$make_cmd"
+
+  echo -n `date`; echo ": finished running $make_cmd"
+else
+  mkdir -p build && pushd build
+fi
+
+echo -n `date`; echo ": start running $test_cmd"
+
+start=$(date +"%s")
+eval $test_cmd &
+
+test_cmd_pid=$!
+
+while [ $(($(date +"%s") - $start)) -lt $duration ]; do
+  running=`ps p $test_cmd_pid h | wc -l`
+  if [ $running -eq 0 ]; then
+    echo "Test finished"
+    exit 0
+  fi
+
+  sleep 5
+done
+
+echo -n `date`; echo ": process still running after $duration seconds"
+
+tmp=`mktemp XXXXX`
+echo "thread apply all bt" > $tmp
+
+for test_pid in $( pgrep -s 0 ); do
+  cat <<EOF
+==========
+
+Attaching gdb to `ps o pid,cmd p $test_pid h`
+
+==========
+EOF
+
+  gdb attach $test_pid < $tmp
+done
+
+rm $tmp
+
+popd
+
+echo "Test failed and killing the stuck test process"
+
+# Kill all processes in the test process session.
+pkill -s 0
+
+exit 1
\ No newline at end of file


[2/2] git commit: Improved framework rate limiting by imposing the max number of outstanding messages per framework principal.

Posted by ya...@apache.org.
Improved framework rate limiting by imposing the max number of outstanding messages per framework principal.

Review: https://reviews.apache.org/r/24343


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a831c4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a831c4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a831c4c

Branch: refs/heads/tmp
Commit: 4a831c4cb249ef1fcf8a47cc76e935ea87458da9
Parents: e5fcb43
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Aug 6 16:31:53 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Thu Aug 7 00:11:33 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                 |  25 +++-
 src/examples/load_generator_framework.cpp |   7 +-
 src/master/master.cpp                     | 109 +++++++++++++----
 src/master/master.hpp                     |  37 +++++-
 src/tests/rate_limiting_tests.cpp         | 160 +++++++++++++++++++++++++
 5 files changed, 303 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 628cce1..efb4239 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -779,26 +779,39 @@ message ACLs {
  * principal.
  */
 message RateLimit {
-  // Leaving QPS unset gives it unlimited rate (i.e., not throttled).
+  // Leaving QPS unset gives it unlimited rate (i.e., not throttled),
+  // which also implies unlimited capacity.
   optional double qps = 1;
 
   // Principal of framework(s) to be throttled. Should match
   // FrameworkInfo.princpal and Credential.principal (if using authentication).
   required string principal = 2;
+
+  // Max number of outstanding messages from frameworks of this principal
+  // allowed by master before the next message is dropped and an error is sent
+  // back to the sender. Messages received before the capacity is reached are
+  // still going to be processed after the error is sent.
+  // If unspecified, this principal is assigned unlimited capacity.
+  // NOTE: This value is ignored if 'qps' is not set.
+  optional uint64 capacity = 3;
 }
 
 
 /**
  * Collection of RateLimit.
- * Frameworks without rate limits defined here are not throttled
- * unless 'aggregate_default_qps' is specified.
+ * Frameworks without rate limits defined here are not throttled unless
+ * 'aggregate_default_qps' is specified.
  */
 message RateLimits {
   // Items should have unique principals.
   repeated RateLimit limits = 1;
 
-  // All the frameworks not specified in 'limits' get this default
-  // rate. This rate is an aggregate rate for all of them, i.e.,
-  // their combined traffic is throttled together at this rate.
+  // All the frameworks not specified in 'limits' get this default rate.
+  // This rate is an aggregate rate for all of them, i.e., their combined
+  // traffic is throttled together at this rate.
   optional double aggregate_default_qps = 2;
+
+  // All the frameworks not specified in 'limits' get this default capacity.
+  // This is an aggregate value similar to 'aggregate_default_qps'.
+  optional uint64 aggregate_default_capacity = 3;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/examples/load_generator_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/load_generator_framework.cpp b/src/examples/load_generator_framework.cpp
index 7d94c49..01a567b 100644
--- a/src/examples/load_generator_framework.cpp
+++ b/src/examples/load_generator_framework.cpp
@@ -211,7 +211,12 @@ public:
       const SlaveID&,
       int) {}
 
-  virtual void error(SchedulerDriver*, const string&) {}
+  virtual void error(SchedulerDriver*, const string& error)
+  {
+    // Terminating process with EXIT here because we cannot interrupt
+    // LoadGenerator's long-running loop.
+    EXIT(1) << "Error received: " << error;
+  }
 
 private:
   LoadGenerator* generator;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 97e4340..d279edb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,7 +84,6 @@ using process::Owned;
 using process::PID;
 using process::Process;
 using process::Promise;
-using process::RateLimiter;
 using process::Time;
 using process::Timer;
 using process::UPID;
@@ -371,12 +370,18 @@ void Master::initialize()
                 << ". It must be a positive number";
       }
 
-      limiters.put(
-          limit_.principal(),
-          limit_.has_qps()
-            ? Option<Owned<RateLimiter> >::some(
-                Owned<RateLimiter>(new RateLimiter(limit_.qps())))
-            : Option<Owned<RateLimiter> >::none());
+      if (limit_.has_qps()) {
+        Option<uint64_t> capacity;
+        if (limit_.has_capacity()) {
+          capacity = limit_.capacity();
+        }
+        limiters.put(
+            limit_.principal(),
+            Owned<BoundedRateLimiter>(
+                new BoundedRateLimiter(limit_.qps(), capacity)));
+      } else {
+        limiters.put(limit_.principal(), None());
+      }
     }
 
     if (flags.rate_limits.get().has_aggregate_default_qps() &&
@@ -387,8 +392,13 @@ void Master::initialize()
     }
 
     if (flags.rate_limits.get().has_aggregate_default_qps()) {
-      defaultLimiter = Owned<RateLimiter>(
-          new RateLimiter(flags.rate_limits.get().aggregate_default_qps()));
+      Option<uint64_t> capacity;
+      if (flags.rate_limits.get().has_aggregate_default_capacity()) {
+        capacity = flags.rate_limits.get().aggregate_default_capacity();
+      }
+      defaultLimiter = Owned<BoundedRateLimiter>(
+          new BoundedRateLimiter(
+              flags.rate_limits.get().aggregate_default_qps(), capacity));
     }
 
     LOG(INFO) << "Framework rate limiting enabled";
@@ -851,9 +861,6 @@ void Master::visit(const MessageEvent& event)
     return;
   }
 
-  // Necessary to disambiguate below.
-  typedef void(Self::*F)(const MessageEvent&);
-
   // Throttle the message if it's a framework message and a
   // RateLimiter is configured for the framework's principal.
   // The framework is throttled by the default RateLimiter if:
@@ -864,30 +871,42 @@ void Master::visit(const MessageEvent& event)
   // 1) the default RateLimiter is not configured to handle case 2)
   //    above. (or)
   // 2) the principal exists in RateLimits but 'qps' is not set.
-  if (principal.isSome() &&
-      limiters.contains(principal.get()) &&
-      limiters[principal.get()].isSome()) {
-    limiters[principal.get()].get()->acquire()
-      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+  Option<Owned<BoundedRateLimiter> > limiter;
+  if (principal.isSome() && limiters.contains(principal.get())) {
+    limiter = limiters[principal.get()];
   } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
-             isRegisteredFramework &&
-             defaultLimiter.isSome()) {
-    defaultLimiter.get()->acquire()
-      .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+             isRegisteredFramework) {
+    limiter = defaultLimiter;
+  }
+
+  // Now throttle the message if a limiter is found, unless its
+  // capacity is already reached.
+  if (limiter.isSome()) {
+    if (limiter.get()->capacity.isNone() ||
+        limiter.get()->messages < limiter.get()->capacity.get()) {
+      limiter.get()->messages++;
+      limiter.get()->limiter->acquire()
+        .onReady(defer(self(), &Self::throttled, event, principal));
+    } else {
+      exceededCapacity(
+          event,
+          principal,
+          limiter.get()->capacity.get());
+    }
   } else {
     _visit(event);
   }
 }
 
 
-void Master::visit(const process::ExitedEvent& event)
+void Master::visit(const ExitedEvent& event)
 {
   // See comments in 'visit(const MessageEvent& event)' for which
   // RateLimiter is used to throttle this UPID and when it is not
   // throttled.
   // Note that throttling ExitedEvent is necessary so the order
   // between MessageEvents and ExitedEvents from the same PID is
-  // maintained.
+  // maintained. Also ExitedEvents are not subject to the capacity.
   bool isRegisteredFramework = frameworks.principals.contains(event.pid);
   const Option<string> principal = isRegisteredFramework
     ? frameworks.principals[event.pid]
@@ -899,12 +918,12 @@ void Master::visit(const process::ExitedEvent& event)
   if (principal.isSome() &&
       limiters.contains(principal.get()) &&
       limiters[principal.get()].isSome()) {
-    limiters[principal.get()].get()->acquire()
+    limiters[principal.get()].get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
              isRegisteredFramework &&
              defaultLimiter.isSome()) {
-    defaultLimiter.get()->acquire()
+    defaultLimiter.get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else {
     _visit(event);
@@ -912,6 +931,22 @@ void Master::visit(const process::ExitedEvent& event)
 }
 
 
+void Master::throttled(
+    const MessageEvent& event,
+    const Option<std::string>& principal)
+{
+  // We already know a RateLimiter is used to throttle this event so
+  // here we only need to determine which.
+  if (principal.isSome()) {
+    limiters[principal.get()].get()->messages--;
+  } else {
+    defaultLimiter.get()->messages--;
+  }
+
+  _visit(event);
+}
+
+
 void Master::_visit(const MessageEvent& event)
 {
   // Obtain the principal before processing the Message because the
@@ -936,6 +971,30 @@ void Master::_visit(const MessageEvent& event)
 }
 
 
+void Master::exceededCapacity(
+    const MessageEvent& event,
+    const Option<string>& principal,
+    uint64_t capacity)
+{
+  LOG(WARNING) << "Dropping message " << event.message->name << " from "
+               << event.message->from
+               << (principal.isSome() ? "(" + principal.get() + ")" : "")
+               << ": capacity(" << capacity << ") exceeded";
+
+  // Send an error to the framework which will abort the scheduler
+  // driver.
+  // NOTE: The scheduler driver will send back a
+  // DeactivateFrameworkMessage which may be dropped as well but this
+  // should be fine because the scheduler is already informed of an
+  // unrecoverable error and should take action to recover.
+  FrameworkErrorMessage message;
+  message.set_message(
+      "Message " + event.message->name +
+      " dropped: capacity(" + stringify(capacity) + ") exceeded");
+  send(event.message->from, message);
+}
+
+
 void Master::_visit(const ExitedEvent& event)
 {
   Process<Master>::visit(event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d8a4d9e..29e8f49 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -241,10 +241,25 @@ protected:
   virtual void visit(const process::MessageEvent& event);
   virtual void visit(const process::ExitedEvent& event);
 
+  // Invoked when the message is ready to be executed after
+  // being throttled.
+  // 'principal' being None indicates it is throttled by
+  // 'defaultLimiter'.
+  void throttled(
+      const process::MessageEvent& event,
+      const Option<std::string>& principal);
+
   // Continuations of visit().
   void _visit(const process::MessageEvent& event);
   void _visit(const process::ExitedEvent& event);
 
+  // Helper method invoked when the capacity for a framework
+  // principal is exceeded.
+  void exceededCapacity(
+      const process::MessageEvent& event,
+      const Option<std::string>& principal,
+      uint64_t capacity);
+
   // Recovers state from the registrar.
   process::Future<Nothing> recover();
   void recoveredSlavesTimeout(const Registry& registry);
@@ -744,14 +759,30 @@ private:
       const FrameworkInfo& frameworkInfo,
       const process::UPID& from);
 
-  // RateLimiters keyed by the framework principal.
+  struct BoundedRateLimiter
+  {
+    BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+      : limiter(new process::RateLimiter(qps)),
+        capacity(_capacity),
+        messages(0) {}
+
+    process::Owned<process::RateLimiter> limiter;
+    const Option<uint64_t> capacity;
+
+    // Number of outstanding messages for this RateLimiter.
+    // NOTE: ExitedEvents are throttled but not counted towards
+    // the capacity here.
+    uint64_t messages;
+  };
+
+  // BoundedRateLimiters keyed by the framework principal.
   // Like Metrics::Frameworks, all frameworks of the same principal
   // are throttled together at a common rate limit.
-  hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
+  hashmap<std::string, Option<process::Owned<BoundedRateLimiter> > > limiters;
 
   // The default limiter is for frameworks not specified in
   // 'flags.rate_limits'.
-  Option<process::Owned<process::RateLimiter> > defaultLimiter;
+  Option<process::Owned<BoundedRateLimiter> > defaultLimiter;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index fc23a19..4adebd1 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -954,6 +954,166 @@ TEST_F(RateLimitingTest, SchedulerFailover)
   Shutdown();
 }
 
+
+TEST_F(RateLimitingTest, CapacityReached)
+{
+  master::Flags flags = CreateMasterFlags();
+  RateLimits limits;
+  RateLimit* limit = limits.mutable_limits()->Add();
+  limit->set_principal(DEFAULT_CREDENTIAL.principal());
+  limit->set_qps(1);
+  limit->set_capacity(2);
+  flags.rate_limits = limits;
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Clock::pause();
+
+  // Advance before the test so that the 1st call to Metrics endpoint
+  // is not throttled. MetricsProcess which hosts the endpoint
+  // throttles requests at 2qps and its singleton instance is shared
+  // across tests.
+  Clock::advance(Milliseconds(501));
+
+  MockScheduler sched;
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  // Use a long failover timeout so the master doesn't unregister the
+  // framework right away when it aborts.
+  frameworkInfo.set_failover_timeout(10);
+
+  // Create MesosSchedulerDriver on the heap because of the need to
+  // destroy it during the test due to MESOS-1456.
+  MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(driver, _, _))
+    .Times(1);
+
+  // Grab the stuff we need to replay the RegisterFrameworkMessage.
+  Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+      RegisterFrameworkMessage(), _, master.get());
+  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+  ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+  AWAIT_READY(registerFrameworkMessage);
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+  // Keep sending duplicate RegisterFrameworkMessages. Master sends
+  // FrameworkRegisteredMessage back after processing each of them.
+  {
+    Future<process::Message> duplicateFrameworkRegisteredMessage =
+      FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+                     master.get(),
+                     _);
+
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+    // The first message is not throttled because it's at the head of
+    // the queue.
+    AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+    // Verify that one message is received and processed (after
+    // registration).
+    JSON::Object metrics = METRICS_SNAPSHOT;
+
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  // The subsequent messages are going to be throttled.
+  Future<process::Message> frameworkErrorMessage =
+    FUTURE_MESSAGE(Eq(FrameworkErrorMessage().GetTypeName()),
+                   master.get(),
+                   _);
+
+  // Send two messages which will be queued up. This will reach but not
+  // exceed the capacity.
+  for (int i = 0; i < 2; i++) {
+    process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+  }
+
+  // Settle to make sure no error is sent just yet.
+  Clock::settle();
+  EXPECT_TRUE(frameworkErrorMessage.isPending());
+
+  // The 3rd message results in an immediate error.
+  Future<Nothing> error;
+  EXPECT_CALL(sched, error(
+      driver,
+      "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) "
+      "exceeded"))
+    .WillOnce(FutureSatisfy(&error));
+
+  process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+  AWAIT_READY(frameworkErrorMessage);
+
+  // Settle to make sure scheduler aborts and its
+  // DeactivateFrameworkMessage is received by master.
+  Clock::settle();
+
+  AWAIT_READY(error);
+
+  // Stop the driver but indicate it wants to failover.
+  EXPECT_EQ(DRIVER_ABORTED, driver->stop(true));
+  EXPECT_EQ(DRIVER_STOPPED, driver->join());
+  delete driver;
+
+  // Wait for half a second for metrics endpoint.
+  Clock::advance(Milliseconds(501));
+
+  {
+    JSON::Object metrics = METRICS_SNAPSHOT;
+
+    const string& messages_received =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+    EXPECT_EQ(1u, metrics.values.count(messages_received));
+    EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+    const string& messages_processed =
+      "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+    EXPECT_EQ(1u, metrics.values.count(messages_processed));
+    // Four messages not processed, two in the queue and two dropped.
+    EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+  }
+
+  // Advance three times for the two pending messages and the exited
+  // event to be processed.
+  for (int i = 0; i < 3; i++) {
+    Clock::advance(Milliseconds(1001));
+    Clock::settle();
+  }
+
+  // Counters are not removed because the scheduler is not
+  // unregistered and the master expects it to failover.
+  JSON::Object metrics = METRICS_SNAPSHOT;
+
+  const string& messages_received =
+    "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+  EXPECT_EQ(1u, metrics.values.count(messages_received));
+  EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+  const string& messages_processed =
+    "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+  EXPECT_EQ(1u, metrics.values.count(messages_processed));
+  // Two messages are dropped.
+  EXPECT_EQ(3, metrics.values[messages_processed].as<JSON::Number>().value);
+
+  Shutdown();
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {