You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/24 03:07:19 UTC
[1/2] mesos git commit: Fixed a master API bug for agent
re-registration after master failover.
Repository: mesos
Updated Branches:
refs/heads/master 28ab19b11 -> b4e210678
Fixed a master API bug for agent re-registration after master failover.
When the master fails over and a client subscribes to the master before
agent re-registration, the master will crash when sending `TASK_ADDED`
because the framework info might not have been added to the master yet.
This patch fixes this bug.
Review: https://reviews.apache.org/r/65774/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f2ec2b28
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f2ec2b28
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f2ec2b28
Branch: refs/heads/master
Commit: f2ec2b288e823424b2efe71d62ef90101b7a863f
Parents: 28ab19b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Feb 23 18:37:12 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Feb 23 18:37:12 2018 -0800
----------------------------------------------------------------------
src/master/master.cpp | 98 ++++++++++++++++------------------------------
src/master/master.hpp | 15 +++++--
2 files changed, 46 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e7d5ac6..f2c2dbe 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10241,8 +10241,18 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
task->mutable_statuses(task->statuses_size() - 1)->clear_data();
if (sendSubscribersUpdate && !subscribers.subscribed.empty()) {
- subscribers.send(protobuf::master::event::createTaskUpdated(
- *task, task->state(), status));
+ // If the framework has been removed, the task would have already
+ // transitioned to `TASK_KILLED` by `removeFramework()`, thus
+ // `sendSubscribersUpdate` shouldn't have been set to true.
+ // TODO(chhsiao): This may be changed after MESOS-6608 is resolved.
+ Framework* framework = getFramework(task->framework_id());
+ CHECK_NOTNULL(framework);
+
+ subscribers.send(
+ protobuf::master::event::createTaskUpdated(
+ *task, task->state(), status),
+ framework->info,
+ *task);
}
LOG(INFO) << "Updating the state of task " << task->task_id()
@@ -11185,57 +11195,25 @@ static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo)
}
-void Master::Subscribers::send(mesos::master::Event&& event)
+void Master::Subscribers::send(
+ mesos::master::Event&& event,
+ const Option<FrameworkInfo>& frameworkInfo,
+ const Option<Task>& task)
{
VLOG(1) << "Notifying all active subscribers about " << event.type()
<< " event";
- Option<Shared<FrameworkInfo>> frameworkInfo;
- Option<Shared<Task>> task;
-
- // Copy metadata associated with the event if necessary, so that we capture
- // the current state before we make asynchronous calls below.
- switch (event.type()) {
- case mesos::master::Event::TASK_ADDED: {
- Framework* framework =
- master->getFramework(event.task_added().task().framework_id());
-
- CHECK_NOTNULL(framework);
-
- frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
- break;
- }
- case mesos::master::Event::TASK_UPDATED: {
- Framework* framework =
- master->getFramework(event.task_updated().framework_id());
-
- CHECK_NOTNULL(framework);
-
- frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
-
- Task* storedTask =
- framework->getTask(event.task_updated().status().task_id());
-
- CHECK_NOTNULL(storedTask);
-
- task = Shared<Task>(new Task(*storedTask));
- break;
- }
- case mesos::master::Event::FRAMEWORK_ADDED:
- case mesos::master::Event::FRAMEWORK_UPDATED:
- case mesos::master::Event::FRAMEWORK_REMOVED:
- case mesos::master::Event::AGENT_ADDED:
- case mesos::master::Event::AGENT_REMOVED:
- case mesos::master::Event::SUBSCRIBED:
- case mesos::master::Event::HEARTBEAT:
- case mesos::master::Event::UNKNOWN:
- break;
- }
-
// Create a single copy of the event for all subscribers to share.
Shared<mesos::master::Event> sharedEvent(
new mesos::master::Event(std::move(event)));
+ // Create a single copy of `FrameworkInfo` and `Task` for all
+ // subscribers to share.
+ Shared<FrameworkInfo> sharedFrameworkInfo(
+ frameworkInfo.isSome()
+ ? new FrameworkInfo(frameworkInfo.get()) : nullptr);
+ Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
+
foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
Future<Owned<AuthorizationAcceptor>> authorizeRole =
AuthorizationAcceptor::create(
@@ -11283,8 +11261,8 @@ void Master::Subscribers::send(mesos::master::Event&& event)
authorizeFramework,
authorizeTask,
authorizeExecutor,
- frameworkInfo,
- task);
+ sharedFrameworkInfo,
+ sharedTask);
return Nothing();
}));
@@ -11298,30 +11276,26 @@ void Master::Subscribers::Subscriber::send(
const Owned<AuthorizationAcceptor>& authorizeFramework,
const Owned<AuthorizationAcceptor>& authorizeTask,
const Owned<AuthorizationAcceptor>& authorizeExecutor,
- const Option<Shared<FrameworkInfo>>& frameworkInfo,
- const Option<Shared<Task>>& task)
+ const Shared<FrameworkInfo>& frameworkInfo,
+ const Shared<Task>& task)
{
switch (event->type()) {
case mesos::master::Event::TASK_ADDED: {
- CHECK_SOME(frameworkInfo);
- CHECK_NOTNULL(&frameworkInfo.get());
+ CHECK_NOTNULL(frameworkInfo.get());
if (authorizeTask->accept(
- event->task_added().task(), *frameworkInfo.get()) &&
- authorizeFramework->accept(*frameworkInfo.get())) {
+ event->task_added().task(), *frameworkInfo) &&
+ authorizeFramework->accept(*frameworkInfo)) {
http.send<mesos::master::Event, v1::master::Event>(*event);
}
break;
}
case mesos::master::Event::TASK_UPDATED: {
- CHECK_SOME(frameworkInfo);
- CHECK_NOTNULL(&frameworkInfo.get());
+ CHECK_NOTNULL(frameworkInfo.get());
+ CHECK_NOTNULL(task.get());
- CHECK_SOME(task);
- CHECK_NOTNULL(&task.get());
-
- if (authorizeTask->accept(*task.get(), *frameworkInfo.get()) &&
- authorizeFramework->accept(*frameworkInfo.get())) {
+ if (authorizeTask->accept(*task, *frameworkInfo) &&
+ authorizeFramework->accept(*frameworkInfo)) {
http.send<mesos::master::Event, v1::master::Event>(*event);
}
break;
@@ -11553,10 +11527,6 @@ void Slave::addTask(Task* task)
usedResources[frameworkId] += resources;
}
- if (!master->subscribers.subscribed.empty()) {
- master->subscribers.send(protobuf::master::event::createTaskAdded(*task));
- }
-
// Note that we use `Resources` for output as it's faster than
// logging raw protobuf data.
LOG(INFO) << "Adding task " << taskId
http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6569c9e..1fadbe6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2004,8 +2004,8 @@ private:
const process::Owned<AuthorizationAcceptor>& authorizeFramework,
const process::Owned<AuthorizationAcceptor>& authorizeTask,
const process::Owned<AuthorizationAcceptor>& authorizeExecutor,
- const Option<process::Shared<FrameworkInfo>>& frameworkInfo,
- const Option<process::Shared<Task>>& task);
+ const process::Shared<FrameworkInfo>& frameworkInfo,
+ const process::Shared<Task>& task);
~Subscriber()
{
@@ -2026,7 +2026,10 @@ private:
};
// Sends the event to all subscribers connected to the 'api/vX' endpoint.
- void send(mesos::master::Event&& event);
+ void send(
+ mesos::master::Event&& event,
+ const Option<FrameworkInfo>& frameworkInfo = None(),
+ const Option<Task>& task = None());
Master* master;
@@ -2274,6 +2277,12 @@ struct Framework
trackUnderRole(role);
}
}
+
+ if (!master->subscribers.subscribed.empty()) {
+ master->subscribers.send(
+ protobuf::master::event::createTaskAdded(*task),
+ info);
+ }
}
// Update framework to recover the resources that were previously
[2/2] mesos git commit: Added a master API test for agent
re-registration after master failover.
Posted by gr...@apache.org.
Added a master API test for agent re-registration after master failover.
This test verifies that subscribing to the 'api/v1' endpoint between a
master failover and an agent re-registration won't cause the master to
crash.
Review: https://reviews.apache.org/r/65775/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e21067
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e21067
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e21067
Branch: refs/heads/master
Commit: b4e210678c04e57c2fa9f277b44f6d011da1846a
Parents: f2ec2b2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Feb 23 18:37:17 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Feb 23 18:37:17 2018 -0800
----------------------------------------------------------------------
src/tests/api_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 170 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e21067/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 31906eb..9c172f7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -94,6 +94,7 @@ using process::Promise;
using recordio::Decoder;
+using std::set;
using std::string;
using std::tuple;
using std::vector;
@@ -2389,6 +2390,175 @@ TEST_P(MasterAPITest, Subscribe)
}
+// This test verifies that subscribing to the 'api/v1' endpoint between
+// a master failover and an agent re-registration won't cause the master
+// to crash. See MESOS-8601.
+TEST_P(MasterAPITest, MasterFailover)
+{
+ ContentType contentType = GetParam();
+
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+ ASSERT_SOME(slave);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ Owned<v1::scheduler::TestMesos> mesos(new v1::scheduler::TestMesos(
+ master.get()->pid,
+ contentType,
+ scheduler));
+
+ AWAIT_READY(subscribed);
+ const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+ // Settle the clock to get the first offer.
+ Clock::settle();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+ const v1::AgentID& agentId = offers->offers(0).agent_id();
+
+ // Launch a task using the scheduler. This should result in a
+ // `TASK_STARTING` followed by a `TASK_RUNNING` status update.
+ Future<v1::scheduler::Event::Update> taskStarting;
+ Future<v1::scheduler::Event::Update> taskRunning;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(
+ FutureArg<1>(&taskStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&taskRunning),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ v1::TaskInfo task = v1::createTask(
+ agentId,
+ offers->offers(0).resources(),
+ "sleep 1000");
+
+ mesos->send(v1::createCallAccept(
+ frameworkId,
+ offers->offers(0),
+ {v1::LAUNCH({task})}));
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(v1::TASK_STARTING, taskStarting->status().state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(v1::TASK_RUNNING, taskRunning->status().state());
+
+ // Stop the scheduler and shutdown the master.
+ mesos.reset();
+ master->reset();
+
+ // Restart the master.
+ master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ detector.appoint(master.get()->pid);
+
+ // Create event stream after the master failover but before the agent
+ // re-registration. We should see no framework, agent, task and
+ // executor at all.
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ headers["Accept"] = stringify(contentType);
+
+ Future<http::Response> response = http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+ const v1::master::Response::GetState& getState =
+ event->get().subscribed().get_state();
+
+ EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(0, getState.get_agents().agents_size());
+ EXPECT_EQ(0, getState.get_tasks().tasks_size());
+ EXPECT_EQ(0, getState.get_executors().executors_size());
+
+ event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Advance the clock to trigger agent re-registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The agent re-registration should result in an `AGENT_ADDED` event
+ // and a `TASK_ADDED` event.
+ set<v1::master::Event::Type> expectedEvents =
+ {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
+ set<v1::master::Event::Type> observedEvents;
+
+ event = decoder.read();
+ AWAIT_READY(event);
+ observedEvents.insert(event->get().type());
+
+ event = decoder.read();
+ AWAIT_READY(event);
+ observedEvents.insert(event->get().type());
+
+ EXPECT_EQ(expectedEvents, observedEvents);
+}
+
+
// Verifies that operators subscribed to the master's operator API event
// stream only receive events that they are authorized to see.
TEST_P(MasterAPITest, EventAuthorizationFiltering)