You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2014/06/17 23:10:39 UTC
[3/5] git commit: Added "per-framework-principal" counters for
messages from a scheduler to the master.
Added "per-framework-principal" counters for messages from a scheduler to the master.
Review: https://reviews.apache.org/r/22316
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d19e5880
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d19e5880
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d19e5880
Branch: refs/heads/master
Commit: d19e58802fff4f62e2422cded8bfff902d131b64
Parents: 7d7d94b
Author: Jiang Yan Xu <ya...@jxu.me>
Authored: Mon Jun 9 22:05:22 2014 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Tue Jun 17 14:10:20 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/master/master.cpp | 72 ++++-
src/master/master.hpp | 50 +++
src/tests/rate_limiting_tests.cpp | 554 +++++++++++++++++++++++++++++++++
4 files changed, 675 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ae6760..b1b7d2d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -987,6 +987,7 @@ mesos_tests_SOURCES = \
tests/monitor_tests.cpp \
tests/paths_tests.cpp \
tests/protobuf_io_tests.cpp \
+ tests/rate_limiting_tests.cpp \
tests/reconciliation_tests.cpp \
tests/registrar_tests.cpp \
tests/repair_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4a01b1a..fb5c770 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -84,6 +84,8 @@ using process::Time;
using process::Timer;
using process::UPID;
+using process::metrics::Counter;
+
using memory::shared_ptr;
namespace mesos {
@@ -750,6 +752,19 @@ void Master::exited(const UPID& pid)
void Master::visit(const MessageEvent& event)
{
+ // Increment the "message_received" counter if the message is from
+ // a framework and such a counter is configured for it.
+ // See comments for 'Master::Metrics::Frameworks' and
+ // 'Master::Frameworks::principals' for details.
+ const Option<string> principal =
+ frameworks.principals.get(event.message->from);
+ if (principal.isSome()) {
+ CHECK(metrics.frameworks.contains(principal.get()));
+ Counter messages_received =
+ metrics.frameworks.get(principal.get()).get()->messages_received;
+ ++messages_received;
+ }
+
// All messages are filtered when non-leading.
if (!elected()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
@@ -773,6 +788,15 @@ void Master::visit(const MessageEvent& event)
}
ProtobufProcess<Master>::visit(event);
+
+ // Increment 'messages_processed' counter if it still exists.
+ // Note that it could be removed in handling
+ // 'UnregisterFrameworkMessage'.
+ if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+ Counter messages_processed =
+ metrics.frameworks.get(principal.get()).get()->messages_processed;
+ ++messages_processed;
+ }
}
@@ -3567,6 +3591,30 @@ void Master::addFramework(Framework* framework)
allocator->frameworkAdded(
framework->id, framework->info, framework->resources);
+
+ // Export framework metrics.
+
+ // If the framework is authenticated, its principal should be in
+ // 'authenticated'. Otherwise look if it's supplied in
+ // FrameworkInfo.
+ Option<string> principal = authenticated.get(framework->pid);
+ if (principal.isNone() && framework->info.has_principal()) {
+ principal = framework->info.principal();
+ }
+
+ // Export framework metrics if a principal is specified.
+ if (principal.isSome()) {
+ CHECK(!frameworks.principals.contains(framework->pid));
+ frameworks.principals.put(framework->pid, principal.get());
+
+ // Create new framework metrics if this framework is the first
+ // one of this principal. Otherwise existing metrics are reused.
+ if (!metrics.frameworks.contains(principal.get())) {
+ metrics.frameworks.put(
+ principal.get(),
+ Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
+ }
+ }
}
@@ -3574,7 +3622,7 @@ void Master::addFramework(Framework* framework)
// event of a scheduler failover.
void Master::failoverFramework(Framework* framework, const UPID& newPid)
{
- const UPID& oldPid = framework->pid;
+ const UPID oldPid = framework->pid;
// There are a few failover cases to consider:
// 1. The pid has changed. In this case we definitely want to
@@ -3622,6 +3670,13 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
offer->framework_id(), offer->slave_id(), offer->resources());
removeOffer(offer);
}
+
+ // 'Failover' the framework's metrics. i.e., change the lookup key
+ // for its metrics to 'newPid'.
+ if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
+ frameworks.principals[newPid] = frameworks.principals[oldPid];
+ frameworks.principals.erase(oldPid);
+ }
}
@@ -3702,7 +3757,20 @@ void Master::removeFramework(Framework* framework)
// Remove the framework from authenticated.
authenticated.erase(framework->pid);
- // Remove it.
+ // Remove the framework's message counters.
+ const Option<string> principal = frameworks.principals.get(framework->pid);
+ if (principal.isSome()) {
+ frameworks.principals.erase(framework->pid);
+
+ // Remove the metrics for the principal if this framework is the
+ // last one with this principal.
+ if (!frameworks.principals.containsValue(principal.get())) {
+ CHECK(metrics.frameworks.contains(principal.get()));
+ metrics.frameworks.erase(principal.get());
+ }
+ }
+
+ // Remove the framework.
frameworks.activated.erase(framework->id);
allocator->frameworkRemoved(framework->id);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7a12185..d55c4f5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -484,6 +484,16 @@ private:
hashmap<FrameworkID, Framework*> activated;
boost::circular_buffer<memory::shared_ptr<Framework> > completed;
+
+ // Principals of frameworks keyed by PID.
+ // NOTE: Multiple PIDs can map to the same principal. The
+ // differences between this map and 'authenticated' are:
+ // 1) This map only includes *registered* frameworks. The mapping
+ // is added when a framework (re-)registers.
+ // 2) This map includes unauthenticated frameworks (when Master
+ // allows them) if they have principals specified in
+ // FrameworkInfo.
+ hashmap<process::UPID, std::string> principals;
} frameworks;
hashmap<OfferID, Offer*> offers;
@@ -544,6 +554,46 @@ private:
// Message counters.
process::metrics::Counter dropped_messages;
+ // Metrics specific to frameworks of a common principal.
+ // These metrics have names prefixed by "frameworks/<principal>/".
+ struct Frameworks
+ {
+ // Counters for messages from all frameworks of this principal.
+ // Note: We only count messages from active scheduler
+ // *instances* while they are *registered*. i.e., messages
+ // prior to the completion of (re)registration
+ // (AuthenticateMessage and (Re)RegisterFrameworkMessage) and
+ // messages from an inactive scheduler instance (after the
+ // framework has failed over) are not counted.
+
+ // Framework messages received (before processing).
+ process::metrics::Counter messages_received;
+
+ // Framework messages processed.
+ // NOTE: This doesn't include dropped messages. Also due to
+ // Master's asynchronous nature, this doesn't necessarily mean
+ // the work requested by this message has finished.
+ process::metrics::Counter messages_processed;
+
+ explicit Frameworks(const std::string& principal)
+ : messages_received("frameworks/" + principal + "/messages_received"),
+ messages_processed("frameworks/" + principal + "/messages_processed")
+ {
+ process::metrics::add(messages_received);
+ process::metrics::add(messages_processed);
+ }
+
+ ~Frameworks()
+ {
+ process::metrics::remove(messages_received);
+ process::metrics::remove(messages_processed);
+ }
+ };
+
+ // Per-framework-principal metrics keyed by the framework
+ // principal.
+ hashmap<std::string, process::Owned<Frameworks> > frameworks;
+
// Messages from schedulers.
process::metrics::Counter messages_register_framework;
process::metrics::Counter messages_reregister_framework;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d19e5880/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
new file mode 100644
index 0000000..baf912e
--- /dev/null
+++ b/src/tests/rate_limiting_tests.cpp
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include "master/allocator.hpp"
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::master::allocator::AllocatorProcess;
+
+using process::metrics::internal::MetricsProcess;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtLeast;
+using testing::Eq;
+using testing::Return;
+
+// This test case covers tests related to framework API rate limiting
+// which includes metrics exporting for API call rates.
+class RateLimitingTest : public MesosTest {};
+
+
+// Verify that message counters for a framework are added when a
+// framework registers and removed when it terminates.
+TEST_F(RateLimitingTest, FrameworkMessageCounterMetrics)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Message counters not present before the framework registers.
+ {
+ // TODO(xujyan): It would be nice to refactor out the common
+ // metrics snapshot querying logic into a helper method/MACRO.
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ MockScheduler sched;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver->start());
+
+ AWAIT_READY(registered);
+
+ // Message counters added after the framework is registered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver->stop();
+ driver->join();
+ delete driver;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // Message counter removed after the framework is unregistered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Shutdown();
+}
+
+
+// Verify that framework message counters work with frameworks of
+// different principals.
+TEST_F(RateLimitingTest, FrameworkMessageCountersMultipleFrameworks)
+{
+ // 1. Register two frameworks.
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_principal("framework1");
+
+ MockScheduler sched1;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver1 =
+ new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get());
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registered);
+ }
+
+ FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.set_principal("framework2");
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get());
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registered);
+ }
+
+ // 2. Verify that both frameworks have message counters added.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ }
+
+ // 3. Remove a framework.
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver1->stop();
+ driver1->join();
+ delete driver1;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // 4. Its message counters are deleted while the other framework's
+ // counters stay.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_received"));
+ EXPECT_EQ(
+ 0u, metrics.values.count("frameworks/framework1/messages_processed"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/framework2/messages_processed"));
+ }
+
+ driver2.stop();
+ driver2.join();
+
+ Shutdown();
+}
+
+
+// Verify that if multiple frameworks use the same principal, they
+// share the same counters and removing one framework doesn't remove
+// the counters.
+TEST_F(RateLimitingTest, FrameworkMessageCountersSamePrincipalFrameworks)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockScheduler sched1;
+ // Create MesosSchedulerDriver on the heap because of the need to
+ // destroy it during the test due to MESOS-1456.
+ MesosSchedulerDriver* driver1 = new MesosSchedulerDriver(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched1, registered(driver1, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver1->start());
+
+ AWAIT_READY(registered);
+ }
+
+ // 'sched2' uses the same principal "test-principal".
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ {
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ ASSERT_EQ(DRIVER_RUNNING, driver2.start());
+
+ AWAIT_READY(registered);
+ }
+
+ // Message counters added after both frameworks are registered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ driver1->stop();
+ driver1->join();
+ delete driver1;
+
+ AWAIT_READY(frameworkRemoved);
+
+ // Message counters are not removed after the first framework is
+ // unregistered.
+ {
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_received"));
+ EXPECT_EQ(
+ 1u, metrics.values.count("frameworks/"+ DEFAULT_CREDENTIAL.principal() +
+ "/messages_processed"));
+ }
+
+ driver2.stop();
+ driver2.join();
+
+ Shutdown();
+}
+
+
+// Verify that when a scheduler fails over, the new scheduler
+// instance continues to use the same counters.
+TEST_F(RateLimitingTest, SchedulerMessageCounterFailover)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // 1. Launch the first (i.e., failing) scheduler and verify its
+ // counters.
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ {
+ // Grab this message so we can resend it.
+ Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF(
+ RegisterFrameworkMessage(), _, master.get());
+
+ // Grab this message to get the scheduler's pid.
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ driver1.start();
+
+ AWAIT_READY(registerFrameworkMessage);
+ AWAIT_READY(frameworkRegisteredMessage);
+ AWAIT_READY(frameworkId);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ // Send a duplicate RegisterFrameworkMessage. Master replies
+ // with a duplicate FrameworkRegisteredMessage.
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ process::post(schedulerPid, master.get(), registerFrameworkMessage.get());
+
+ // Now one message has been received and processed by Master in
+ // addition to the RegisterFrameworkMessage.
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Settle to make sure message_processed counters are updated.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // Verify the message counters.
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ // One message received and processed after the framework is
+ // registered.
+ const string& messages_received =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ // 2. Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler and verify that
+ // its counters are not reset to zero.
+
+ MockScheduler sched2;
+
+ FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+ framework2 = DEFAULT_FRAMEWORK_INFO;
+ framework2.mutable_id()->MergeFrom(frameworkId.get());
+
+ MesosSchedulerDriver driver2(
+ &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+ // Scheduler driver ignores duplicate FrameworkRegisteredMessages.
+ EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _))
+ .Times(1);
+
+ Future<Nothing> sched1Error;
+ EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+ .WillOnce(FutureSatisfy(&sched1Error));
+
+ {
+ // Grab this message to get the scheduler's pid and to make sure we
+ // wait until the framework is registered.
+ Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
+ Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
+
+ // Grab this message so we can resend it.
+ Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+ FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+ driver2.start();
+
+ AWAIT_READY(reregisterFrameworkMessage);
+ AWAIT_READY(sched1Error);
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
+
+ Future<process::Message> duplicateFrameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()),
+ master.get(),
+ _);
+
+ // Sending a duplicate ReregisterFrameworkMessage to test the
+ // message counters with the new scheduler instance.
+ process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get());
+
+ AWAIT_READY(duplicateFrameworkRegisteredMessage);
+
+ // Settle to make sure message_processed counters are updated.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ Future<process::http::Response> response =
+ process::http::get(MetricsProcess::instance()->self(), "snapshot");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object metrics = parse.get();
+
+ // Another message after sched2 is reregistered plus the one from
+ // the sched1.
+ const string& messages_received =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_received";
+ EXPECT_EQ(1u, metrics.values.count(messages_received));
+ EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value);
+
+ const string& messages_processed =
+ "frameworks/"+ DEFAULT_CREDENTIAL.principal() + "/messages_processed";
+ EXPECT_EQ(1u, metrics.values.count(messages_processed));
+ EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value);
+ }
+
+ EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver2.join());
+
+ EXPECT_EQ(DRIVER_ABORTED, driver1.stop());
+ EXPECT_EQ(DRIVER_STOPPED, driver1.join());
+
+ Shutdown();
+}