You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/24 03:07:20 UTC
[2/2] mesos git commit: Added a master API test for agent
re-registration after master failover.
Added a master API test for agent re-registration after master failover.
This test verifies that subscribing to the 'api/v1' endpoint between a
master failover and an agent re-registration won't cause the master to
crash.
Review: https://reviews.apache.org/r/65775/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e21067
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e21067
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e21067
Branch: refs/heads/master
Commit: b4e210678c04e57c2fa9f277b44f6d011da1846a
Parents: f2ec2b2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Feb 23 18:37:17 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Feb 23 18:37:17 2018 -0800
----------------------------------------------------------------------
src/tests/api_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 170 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e21067/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 31906eb..9c172f7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -94,6 +94,7 @@ using process::Promise;
using recordio::Decoder;
+using std::set;
using std::string;
using std::tuple;
using std::vector;
@@ -2389,6 +2390,175 @@ TEST_P(MasterAPITest, Subscribe)
}
+// This test verifies that subscribing to the 'api/v1' endpoint between
+// a master failover and an agent re-registration won't cause the master
+// to crash. See MESOS-8601.
+TEST_P(MasterAPITest, MasterFailover)
+{
+ ContentType contentType = GetParam();
+
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+ ASSERT_SOME(slave);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ Owned<v1::scheduler::TestMesos> mesos(new v1::scheduler::TestMesos(
+ master.get()->pid,
+ contentType,
+ scheduler));
+
+ AWAIT_READY(subscribed);
+ const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+ // Settle the clock to get the first offer.
+ Clock::settle();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+ const v1::AgentID& agentId = offers->offers(0).agent_id();
+
+ // Launch a task using the scheduler. This should result in a
+ // `TASK_STARTING` followed by a `TASK_RUNNING` status update.
+ Future<v1::scheduler::Event::Update> taskStarting;
+ Future<v1::scheduler::Event::Update> taskRunning;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(
+ FutureArg<1>(&taskStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&taskRunning),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ v1::TaskInfo task = v1::createTask(
+ agentId,
+ offers->offers(0).resources(),
+ "sleep 1000");
+
+ mesos->send(v1::createCallAccept(
+ frameworkId,
+ offers->offers(0),
+ {v1::LAUNCH({task})}));
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(v1::TASK_STARTING, taskStarting->status().state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(v1::TASK_RUNNING, taskRunning->status().state());
+
+ // Stop the scheduler and shutdown the master.
+ mesos.reset();
+ master->reset();
+
+ // Restart the master.
+ master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ detector.appoint(master.get()->pid);
+
+ // Create event stream after the master failover but before the agent
+ // re-registration. We should see no framework, agent, task and
+ // executor at all.
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ headers["Accept"] = stringify(contentType);
+
+ Future<http::Response> response = http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+ const v1::master::Response::GetState& getState =
+ event->get().subscribed().get_state();
+
+ EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(0, getState.get_agents().agents_size());
+ EXPECT_EQ(0, getState.get_tasks().tasks_size());
+ EXPECT_EQ(0, getState.get_executors().executors_size());
+
+ event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Advance the clock to trigger agent re-registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The agent re-registration should result in an `AGENT_ADDED` event
+ // and a `TASK_ADDED` event.
+ set<v1::master::Event::Type> expectedEvents =
+ {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
+ set<v1::master::Event::Type> observedEvents;
+
+ event = decoder.read();
+ AWAIT_READY(event);
+ observedEvents.insert(event->get().type());
+
+ event = decoder.read();
+ AWAIT_READY(event);
+ observedEvents.insert(event->get().type());
+
+ EXPECT_EQ(expectedEvents, observedEvents);
+}
+
+
// Verifies that operators subscribed to the master's operator API event
// stream only receive events that they are authorized to see.
TEST_P(MasterAPITest, EventAuthorizationFiltering)