You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/08/07 09:14:26 UTC
[1/2] git commit: Added 'timed_tests.sh' script to help investigate
the cause of hanging tests.
Repository: mesos
Updated Branches:
refs/heads/tmp [created] 4a831c4cb
Added 'timed_tests.sh' script to help investigate the cause of hanging tests.
Review: https://reviews.apache.org/r/23700
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5fcb436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5fcb436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5fcb436
Branch: refs/heads/tmp
Commit: e5fcb436a2293ae5ac5b90cd382b591c62d3a2d7
Parents: 6208631
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Fri Jul 18 12:57:46 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Thu Aug 7 00:10:51 2014 -0700
----------------------------------------------------------------------
support/timed_tests.sh | 113 ++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5fcb436/support/timed_tests.sh
----------------------------------------------------------------------
diff --git a/support/timed_tests.sh b/support/timed_tests.sh
new file mode 100755
index 0000000..0b44716
--- /dev/null
+++ b/support/timed_tests.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/setsid bash
+
+function usage {
+cat <<EOF
+
+Run Mesos tests within a duration and attach gdb if the tests time out. This
+script is expected to be run under the repository's root directory.
+
+Usage: $0 [-h] [-m <make_cmd>] <test_cmd> <duration>
+
+ test_cmd Command that runs the tests.
+ e.g., MESOS_VERBOSE=1 make check GTEST_SHUFFLE=1
+ duration Duration (in seconds) before the tests time out.
+ e.g., 3600, \$((160 * 60))
+ make_cmd Optional command which is excuted before 'test_cmd'.
+ e.g., make check GTEST_FILTER=
+ -h Print this help message and exit
+EOF
+}
+
+die () {
+ echo >&2 "$@"
+ exit 1
+}
+
+while getopts ":hm:" opt; do
+ case "$opt" in
+ m)
+ make_cmd="${OPTARG}"
+ ;;
+ h)
+ usage
+ exit 0
+ ;;
+ *)
+ echo "Unknown option: -$OPTARG"
+ usage
+ exit 1
+ ;;
+ esac
+done
+
+shift $(($OPTIND - 1))
+if test ${#} -ne 2; then
+ usage
+ exit 1
+fi
+
+test_cmd=$1
+duration=$2
+
+echo "This script runs with session id $$ and can be terminated by: pkill -s $$"
+
+if [ "$make_cmd" ]; then
+ ./bootstrap
+
+ mkdir -p build && pushd build
+
+ ../configure
+
+ echo -n `date`; echo ": start running $make_cmd"
+
+ eval "$make_cmd"
+
+ echo -n `date`; echo ": finished running $make_cmd"
+else
+ mkdir -p build && pushd build
+fi
+
+echo -n `date`; echo ": start running $test_cmd"
+
+start=$(date +"%s")
+eval $test_cmd &
+
+test_cmd_pid=$!
+
+while [ $(($(date +"%s") - $start)) -lt $duration ]; do
+ running=`ps p $test_cmd_pid h | wc -l`
+ if [ $running -eq 0 ]; then
+ echo "Test finished"
+ exit 0
+ fi
+
+ sleep 5
+done
+
+echo -n `date`; echo ": process still running after $duration seconds"
+
+tmp=`mktemp XXXXX`
+echo "thread apply all bt" > $tmp
+
+for test_pid in $( pgrep -s 0 ); do
+ cat <<EOF
+==========
+
+Attaching gdb to `ps o pid,cmd p $test_pid h`
+
+==========
+EOF
+
+ gdb attach $test_pid < $tmp
+done
+
+rm $tmp
+
+popd
+
+echo "Test failed and killing the stuck test process"
+
+# Kill all processes in the test process session.
+pkill -s 0
+
+exit 1
\ No newline at end of file
[2/2] git commit: Improved framework rate limiting by imposing the
max number of outstanding messages per framework principal.
Posted by ya...@apache.org.
Improved framework rate limiting by imposing the max number of outstanding messages per framework principal.
Review: https://reviews.apache.org/r/24343
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a831c4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a831c4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a831c4c
Branch: refs/heads/tmp
Commit: 4a831c4cb249ef1fcf8a47cc76e935ea87458da9
Parents: e5fcb43
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Wed Aug 6 16:31:53 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Thu Aug 7 00:11:33 2014 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 25 +++-
src/examples/load_generator_framework.cpp | 7 +-
src/master/master.cpp | 109 +++++++++++++----
src/master/master.hpp | 37 +++++-
src/tests/rate_limiting_tests.cpp | 160 +++++++++++++++++++++++++
5 files changed, 303 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 628cce1..efb4239 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -779,26 +779,39 @@ message ACLs {
* principal.
*/
message RateLimit {
- // Leaving QPS unset gives it unlimited rate (i.e., not throttled).
+ // Leaving QPS unset gives it unlimited rate (i.e., not throttled),
+ // which also implies unlimited capacity.
optional double qps = 1;
// Principal of framework(s) to be throttled. Should match
// FrameworkInfo.princpal and Credential.principal (if using authentication).
required string principal = 2;
+
+ // Max number of outstanding messages from frameworks of this principal
+ // allowed by master before the next message is dropped and an error is sent
+ // back to the sender. Messages received before the capacity is reached are
+ // still going to be processed after the error is sent.
+ // If unspecified, this principal is assigned unlimited capacity.
+ // NOTE: This value is ignored if 'qps' is not set.
+ optional uint64 capacity = 3;
}
/**
* Collection of RateLimit.
- * Frameworks without rate limits defined here are not throttled
- * unless 'aggregate_default_qps' is specified.
+ * Frameworks without rate limits defined here are not throttled unless
+ * 'aggregate_default_qps' is specified.
*/
message RateLimits {
// Items should have unique principals.
repeated RateLimit limits = 1;
- // All the frameworks not specified in 'limits' get this default
- // rate. This rate is an aggregate rate for all of them, i.e.,
- // their combined traffic is throttled together at this rate.
+ // All the frameworks not specified in 'limits' get this default rate.
+ // This rate is an aggregate rate for all of them, i.e., their combined
+ // traffic is throttled together at this rate.
optional double aggregate_default_qps = 2;
+
+ // All the frameworks not specified in 'limits' get this default capacity.
+ // This is an aggregate value similar to 'aggregate_default_qps'.
+ optional uint64 aggregate_default_capacity = 3;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/examples/load_generator_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/load_generator_framework.cpp b/src/examples/load_generator_framework.cpp
index 7d94c49..01a567b 100644
--- a/src/examples/load_generator_framework.cpp
+++ b/src/examples/load_generator_framework.cpp
@@ -211,7 +211,12 @@ public:
const SlaveID&,
int) {}
- virtual void error(SchedulerDriver*, const string&) {}
+ virtual void error(SchedulerDriver*, const string& error)
+ {
+ // Terminating process with EXIT here because we cannot interrupt
+ // LoadGenerator's long-running loop.
+ EXIT(1) << "Error received: " << error;
+ }
private:
LoadGenerator* generator;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 97e4340..d279edb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,7 +84,6 @@ using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
-using process::RateLimiter;
using process::Time;
using process::Timer;
using process::UPID;
@@ -371,12 +370,18 @@ void Master::initialize()
<< ". It must be a positive number";
}
- limiters.put(
- limit_.principal(),
- limit_.has_qps()
- ? Option<Owned<RateLimiter> >::some(
- Owned<RateLimiter>(new RateLimiter(limit_.qps())))
- : Option<Owned<RateLimiter> >::none());
+ if (limit_.has_qps()) {
+ Option<uint64_t> capacity;
+ if (limit_.has_capacity()) {
+ capacity = limit_.capacity();
+ }
+ limiters.put(
+ limit_.principal(),
+ Owned<BoundedRateLimiter>(
+ new BoundedRateLimiter(limit_.qps(), capacity)));
+ } else {
+ limiters.put(limit_.principal(), None());
+ }
}
if (flags.rate_limits.get().has_aggregate_default_qps() &&
@@ -387,8 +392,13 @@ void Master::initialize()
}
if (flags.rate_limits.get().has_aggregate_default_qps()) {
- defaultLimiter = Owned<RateLimiter>(
- new RateLimiter(flags.rate_limits.get().aggregate_default_qps()));
+ Option<uint64_t> capacity;
+ if (flags.rate_limits.get().has_aggregate_default_capacity()) {
+ capacity = flags.rate_limits.get().aggregate_default_capacity();
+ }
+ defaultLimiter = Owned<BoundedRateLimiter>(
+ new BoundedRateLimiter(
+ flags.rate_limits.get().aggregate_default_qps(), capacity));
}
LOG(INFO) << "Framework rate limiting enabled";
@@ -851,9 +861,6 @@ void Master::visit(const MessageEvent& event)
return;
}
- // Necessary to disambiguate below.
- typedef void(Self::*F)(const MessageEvent&);
-
// Throttle the message if it's a framework message and a
// RateLimiter is configured for the framework's principal.
// The framework is throttled by the default RateLimiter if:
@@ -864,30 +871,42 @@ void Master::visit(const MessageEvent& event)
// 1) the default RateLimiter is not configured to handle case 2)
// above. (or)
// 2) the principal exists in RateLimits but 'qps' is not set.
- if (principal.isSome() &&
- limiters.contains(principal.get()) &&
- limiters[principal.get()].isSome()) {
- limiters[principal.get()].get()->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ Option<Owned<BoundedRateLimiter> > limiter;
+ if (principal.isSome() && limiters.contains(principal.get())) {
+ limiter = limiters[principal.get()];
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
- isRegisteredFramework &&
- defaultLimiter.isSome()) {
- defaultLimiter.get()->acquire()
- .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
+ isRegisteredFramework) {
+ limiter = defaultLimiter;
+ }
+
+ // Now throttle the message if a limiter is found, unless its
+ // capacity is already reached.
+ if (limiter.isSome()) {
+ if (limiter.get()->capacity.isNone() ||
+ limiter.get()->messages < limiter.get()->capacity.get()) {
+ limiter.get()->messages++;
+ limiter.get()->limiter->acquire()
+ .onReady(defer(self(), &Self::throttled, event, principal));
+ } else {
+ exceededCapacity(
+ event,
+ principal,
+ limiter.get()->capacity.get());
+ }
} else {
_visit(event);
}
}
-void Master::visit(const process::ExitedEvent& event)
+void Master::visit(const ExitedEvent& event)
{
// See comments in 'visit(const MessageEvent& event)' for which
// RateLimiter is used to throttle this UPID and when it is not
// throttled.
// Note that throttling ExitedEvent is necessary so the order
// between MessageEvents and ExitedEvents from the same PID is
- // maintained.
+ // maintained. Also ExitedEvents are not subject to the capacity.
bool isRegisteredFramework = frameworks.principals.contains(event.pid);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.pid]
@@ -899,12 +918,12 @@ void Master::visit(const process::ExitedEvent& event)
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
- limiters[principal.get()].get()->acquire()
+ limiters[principal.get()].get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
isRegisteredFramework &&
defaultLimiter.isSome()) {
- defaultLimiter.get()->acquire()
+ defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
@@ -912,6 +931,22 @@ void Master::visit(const process::ExitedEvent& event)
}
+void Master::throttled(
+ const MessageEvent& event,
+ const Option<std::string>& principal)
+{
+ // We already know a RateLimiter is used to throttle this event so
+ // here we only need to determine which.
+ if (principal.isSome()) {
+ limiters[principal.get()].get()->messages--;
+ } else {
+ defaultLimiter.get()->messages--;
+ }
+
+ _visit(event);
+}
+
+
void Master::_visit(const MessageEvent& event)
{
// Obtain the principal before processing the Message because the
@@ -936,6 +971,30 @@ void Master::_visit(const MessageEvent& event)
}
+void Master::exceededCapacity(
+ const MessageEvent& event,
+ const Option<string>& principal,
+ uint64_t capacity)
+{
+ LOG(WARNING) << "Dropping message " << event.message->name << " from "
+ << event.message->from
+ << (principal.isSome() ? "(" + principal.get() + ")" : "")
+ << ": capacity(" << capacity << ") exceeded";
+
+ // Send an error to the framework which will abort the scheduler
+ // driver.
+ // NOTE: The scheduler driver will send back a
+ // DeactivateFrameworkMessage which may be dropped as well but this
+ // should be fine because the scheduler is already informed of an
+ // unrecoverable error and should take action to recover.
+ FrameworkErrorMessage message;
+ message.set_message(
+ "Message " + event.message->name +
+ " dropped: capacity(" + stringify(capacity) + ") exceeded");
+ send(event.message->from, message);
+}
+
+
void Master::_visit(const ExitedEvent& event)
{
Process<Master>::visit(event);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d8a4d9e..29e8f49 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -241,10 +241,25 @@ protected:
virtual void visit(const process::MessageEvent& event);
virtual void visit(const process::ExitedEvent& event);
+ // Invoked when the message is ready to be executed after
+ // being throttled.
+ // 'principal' being None indicates it is throttled by
+ // 'defaultLimiter'.
+ void throttled(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal);
+
// Continuations of visit().
void _visit(const process::MessageEvent& event);
void _visit(const process::ExitedEvent& event);
+ // Helper method invoked when the capacity for a framework
+ // principal is exceeded.
+ void exceededCapacity(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal,
+ uint64_t capacity);
+
// Recovers state from the registrar.
process::Future<Nothing> recover();
void recoveredSlavesTimeout(const Registry& registry);
@@ -744,14 +759,30 @@ private:
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
- // RateLimiters keyed by the framework principal.
+ struct BoundedRateLimiter
+ {
+ BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+ : limiter(new process::RateLimiter(qps)),
+ capacity(_capacity),
+ messages(0) {}
+
+ process::Owned<process::RateLimiter> limiter;
+ const Option<uint64_t> capacity;
+
+ // Number of outstanding messages for this RateLimiter.
+ // NOTE: ExitedEvents are throttled but not counted towards
+ // the capacity here.
+ uint64_t messages;
+ };
+
+ // BoundedRateLimiters keyed by the framework principal.
// Like Metrics::Frameworks, all frameworks of the same principal
// are throttled together at a common rate limit.
- hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters;
+ hashmap<std::string, Option<process::Owned<BoundedRateLimiter> > > limiters;
// The default limiter is for frameworks not specified in
// 'flags.rate_limits'.
- Option<process::Owned<process::RateLimiter> > defaultLimiter;
+ Option<process::Owned<BoundedRateLimiter> > defaultLimiter;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index fc23a19..4adebd1 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -954,6 +954,166 @@ TEST_F(RateLimitingTest, SchedulerFailover)
Shutdown();
}
+
+TEST_F(RateLimitingTest, CapacityReached)
+{
+ master::Flags flags = CreateMasterFlags();
+ RateLimits limits;
+ RateLimit* limit = limits.mutable_limits()->Add();
+ limit->set_principal(DEFAULT_CREDENTIAL.principal());
+ limit->set_qps(1);
+ limit->set_capacity(2);
+ flags.rate_limits = limits;
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ Clock::pause();
+
+ // Advance before the test so that the 1st call to Metrics endpoint
+ // is not throttled. MetricsProcess which hosts the endpoint
+ // throttles requests at 2qps and its singleton instance is shared
+ // across tests.
+ Clock::advance(Milliseconds(501));
+
+ MockScheduler sched;
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+ // Use a long failover timeout so the master doesn't unregister the
+ // framework right away when it aborts.
+ frameworkInfo.set_failover_timeout(10);
+
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(driver, _, _))
+ .Times(1);
+
+ // Grab the stuff we need to replay the RegisterFrameworkMessage.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Keep sending duplicate RegisterFrameworkMessages. Master sends
+ // FrameworkRegisteredMessage back after processing each of them.
+ {
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // The first message is not throttled because it's at the head of
+ // the queue.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Verify that one message is received and processed (after
+ // registration).
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // The subsequent messages are going to be throttled.
+ Future<process::Message> frameworkErrorMessage =
+ FUTURE_MESSAGE(Eq(FrameworkErrorMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ // Send two messages which will be queued up. This will reach but not
+ // exceed the capacity.
+ for (int i = 0; i < 2; i++) {
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ }
+
+ // Settle to make sure no error is sent just yet.
+ Clock::settle();
+ EXPECT_TRUE(frameworkErrorMessage.isPending());
+
+ // The 3rd message results in an immediate error.
+ Future<Nothing> error;
+ EXPECT_CALL(sched, error(
+ driver,
+ "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) "
+ "exceeded"))
+ .WillOnce(FutureSatisfy(&error));
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+ AWAIT_READY(frameworkErrorMessage);
+
+ // Settle to make sure scheduler aborts and its
+ // DeactivateFrameworkMessage is received by master.
+ Clock::settle();
+
+ AWAIT_READY(error);
+
+ // Stop the driver but indicate it wants to failover.
+ EXPECT_EQ(DRIVER_ABORTED, driver->stop(true));
+ EXPECT_EQ(DRIVER_STOPPED, driver->join());
+ delete driver;
+
+ // Wait for half a second for metrics endpoint.
+ Clock::advance(Milliseconds(501));
+
+ {
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ // Four messages not processed, two in the queue and two dropped.
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // Advance three times for the two pending messages and the exited
+ // event to be processed.
+ for (int i = 0; i < 3; i++) {
+ Clock::advance(Milliseconds(1001));
+ Clock::settle();
+ }
+
+ // Counters are not removed because the scheduler is not
+ // unregistered and the master expects it to failover.
+ JSON::Object metrics = METRICS_SNAPSHOT;
+
+ const string& messages_received =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value);
+ const string& messages_processed =
+ "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ // Two messages are dropped.
+ EXPECT_EQ(3, metrics.values[messages_processed].as<JSON::Number>().value);
+
+ Shutdown();
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {