You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/04 08:27:33 UTC
[mesos] 01/02: Converted sending operator API events to synchronous
authorization.
This is an automated email from the ASF dual-hosted git repository.
asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f2a0eace106692860d1df3c282481f9855d3d629
Author: Andrei Sekretenko <as...@apache.org>
AuthorDate: Thu Feb 27 21:06:21 2020 +0100
Converted sending operator API events to synchronous authorization.
Now that `ObjectApprovers` are indefinitely valid, we can store
them in each Subscriber for its whole lifetime, instead of creating
them for each event sent to each subscriber.
Review: https://reviews.apache.org/r/72178
---
src/master/http.cpp | 2 +-
src/master/master.cpp | 35 +-----
src/master/master.hpp | 23 +---
src/master/readonly_handler.cpp | 3 +-
src/tests/api_tests.cpp | 242 ----------------------------------------
5 files changed, 14 insertions(+), 291 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index f1be402..c51d9fd 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2454,7 +2454,7 @@ void Master::Http::processRequestsBatch() const
postProcessing.state.visit(
[&](const ReadOnlyHandler::PostProcessing::Subscribe& s) {
- master->subscribe(s.connection, s.principal);
+ master->subscribe(s.connection, s.approvers);
});
}
}
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7662c56..e05fc8f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -12239,6 +12239,9 @@ void Master::Subscribers::send(
VLOG(1) << "Notifying all active subscribers about " << event.type()
<< " event";
+ // TODO(asekretenko): Now that we have synchronous authorization,
+ // we can get rid of Shared in this code.
+
// Create a single copy of the event for all subscribers to share.
Shared<mesos::master::Event> sharedEvent(
new mesos::master::Event(std::move(event)));
@@ -12251,39 +12254,13 @@ void Master::Subscribers::send(
Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
- subscriber->getApprovers(
- master->authorizer,
- {VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
- .then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) {
- subscriber->send(
- sharedEvent,
- approvers,
- sharedFrameworkInfo,
- sharedTask);
-
- return Nothing();
- }));
+ subscriber->send(sharedEvent, sharedFrameworkInfo, sharedTask);
}
}
-Future<Owned<ObjectApprovers>> Master::Subscribers::Subscriber::getApprovers(
- const Option<Authorizer*>& authorizer,
- std::initializer_list<authorization::Action> actions)
-{
- Future<Owned<ObjectApprovers>> approvers =
- ObjectApprovers::create(authorizer, principal, actions);
-
- return approversSequence.add<Owned<ObjectApprovers>>(
- [approvers] { return approvers; });
-}
-
-
void Master::Subscribers::Subscriber::send(
const Shared<mesos::master::Event>& event,
- const Owned<ObjectApprovers>& approvers,
const Shared<FrameworkInfo>& frameworkInfo,
const Shared<Task>& task)
{
@@ -12425,7 +12402,7 @@ void Master::exited(const id::UUID& id)
void Master::subscribe(
const StreamingHttpConnection<v1::master::Event>& http,
- const Option<Principal>& principal)
+ const Owned<ObjectApprovers>& approvers)
{
LOG(INFO) << "Added subscriber " << http.streamId
<< " to the list of active subscribers";
@@ -12448,7 +12425,7 @@ void Master::subscribe(
subscribers.subscribed.set(
http.streamId,
Owned<Subscribers::Subscriber>(
- new Subscribers::Subscriber{http, principal}));
+ new Subscribers::Subscriber{http, approvers}));
metrics->operator_event_stream_subscribers =
subscribers.subscribed.size();
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 34ef2f1..6d99d4f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -52,7 +52,6 @@
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
-#include <process/sequence.hpp>
#include <process/timer.hpp>
#include <process/metrics/counter.hpp>
@@ -939,7 +938,7 @@ private:
// Subscribes a client to the 'api/vX' endpoint.
void subscribe(
const StreamingHttpConnection<v1::master::Event>& http,
- const Option<process::http::authentication::Principal>& principal);
+ const process::Owned<ObjectApprovers>& approvers);
void teardown(Framework* framework);
@@ -1307,7 +1306,7 @@ public:
{
struct Subscribe
{
- Option<process::http::authentication::Principal> principal;
+ process::Owned<ObjectApprovers> approvers;
StreamingHttpConnection<v1::master::Event> connection;
};
@@ -2215,7 +2214,7 @@ private:
{
Subscriber(
const StreamingHttpConnection<v1::master::Event>& _http,
- const Option<process::http::authentication::Principal> _principal)
+ const process::Owned<ObjectApprovers>& _approvers)
: http(_http),
heartbeater(
"subscriber " + stringify(http.streamId),
@@ -2227,23 +2226,17 @@ private:
http,
DEFAULT_HEARTBEAT_INTERVAL,
DEFAULT_HEARTBEAT_INTERVAL),
- principal(_principal) {}
+ approvers(_approvers) {}
+
// Not copyable, not assignable.
Subscriber(const Subscriber&) = delete;
Subscriber& operator=(const Subscriber&) = delete;
- // Creates object approvers. The futures returned by this method will be
- // completed in the calling order.
- process::Future<process::Owned<ObjectApprovers>> getApprovers(
- const Option<Authorizer*>& authorizer,
- std::initializer_list<authorization::Action> actions);
-
// TODO(greggomann): Refactor this function into multiple event-specific
// overloads. See MESOS-8475.
void send(
const process::Shared<mesos::master::Event>& event,
- const process::Owned<ObjectApprovers>& approvers,
const process::Shared<FrameworkInfo>& frameworkInfo,
const process::Shared<Task>& task);
@@ -2258,11 +2251,7 @@ private:
StreamingHttpConnection<v1::master::Event> http;
ResponseHeartbeater<mesos::master::Event, v1::master::Event> heartbeater;
- const Option<process::http::authentication::Principal> principal;
-
- // We maintain a sequence to coordinate the creation of object approvers
- // in order to sequentialize all events to the subscriber.
- process::Sequence approversSequence;
+ const process::Owned<ObjectApprovers> approvers;
};
// Sends the event to all subscribers connected to the 'api/vX' endpoint.
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index e4a3134..b1336f9 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -2511,8 +2511,7 @@ pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
http.send(heartbeatEvent);
// This new subscriber needs to be added in the post-processing step.
- Master::ReadOnlyHandler::PostProcessing::Subscribe s =
- { approvers->principal, http };
+ Master::ReadOnlyHandler::PostProcessing::Subscribe s = {approvers, http};
Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 21c090a..d1368a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3249,248 +3249,6 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
}
-// Operator API events are sent using an asynchronous call chain. When
-// event-related state changes in the master before the authorizer returns, the
-// continuations which actually send the event should still have a consistent
-// view of the master state from the time when the event occurred. This test
-// forces task removal in the master before the authorizer returns in order to
-// verify that events are sent correctly in that case.
-TEST_P(MasterAPITest, EventAuthorizationDelayed)
-{
- Clock::pause();
-
- ContentType contentType = GetParam();
-
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- auto executor = std::make_shared<v1::MockHTTPExecutor>();
-
- ExecutorID executorId = DEFAULT_EXECUTOR_ID;
- TestContainerizer containerizer(executorId, executor);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
-
- slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
-
- Try<Owned<cluster::Slave>> slave =
- StartSlave(detector.get(), &containerizer, slaveFlags);
- ASSERT_SOME(slave);
-
- Clock::advance(slaveFlags.registration_backoff_factor);
-
- AWAIT_READY(slaveRegisteredMessage);
-
- EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
-
- Future<v1::scheduler::Event::Subscribed> subscribed;
- EXPECT_CALL(*scheduler, subscribed(_, _))
- .WillOnce(FutureArg<1>(&subscribed));
-
- EXPECT_CALL(*scheduler, heartbeat(_))
- .WillRepeatedly(Return()); // Ignore heartbeats.
-
- Future<v1::scheduler::Event::Offers> offers;
- EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- contentType,
- scheduler);
-
- AWAIT_READY(subscribed);
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->offers().empty());
-
- // Create an event stream after seeing first offer but before a task is
- // launched. We should see one framework, one agent, and no tasks/executors.
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
- headers["Accept"] = stringify(contentType);
-
- Future<http::Response> response = http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(http::Response::PIPE, response->type);
- ASSERT_SOME(response->reader);
-
- http::Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
- Reader<v1::master::Event> decoder(deserializer, reader);
-
- Future<Result<v1::master::Event>> event = decoder.read();
- AWAIT_READY(event);
-
- EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
- const v1::master::Response::GetState& getState =
- event->get().subscribed().get_state();
-
- EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
- EXPECT_EQ(1, getState.get_agents().agents_size());
- EXPECT_TRUE(getState.get_tasks().tasks().empty());
- EXPECT_TRUE(getState.get_executors().executors().empty());
-
- event = decoder.read();
-
- AWAIT_READY(event);
-
- EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
-
- event = decoder.read();
- EXPECT_TRUE(event.isPending());
-
- // When the authorizer is called, return pending futures
- // that we can satisfy later.
- Promise<shared_ptr<const ObjectApprover>> taskAddedApprover;
- Promise<shared_ptr<const ObjectApprover>> updateRunningApprover;
- Promise<shared_ptr<const ObjectApprover>> updateFinishedApprover;
-
- Sequence approverSequence;
-
- // Each event results in 4 calls into the authorizer.
- // NOTE: This may change when the operator event stream code is refactored
- // to avoid unnecessary authorizer calls. See MESOS-8475.
- EXPECT_CALL(authorizer, getApprover(_, _))
- .Times(4)
- .InSequence(approverSequence)
- .WillRepeatedly(Return(taskAddedApprover.future()));
- EXPECT_CALL(authorizer, getApprover(_, _))
- .Times(4)
- .InSequence(approverSequence)
- .WillRepeatedly(Return(updateRunningApprover.future()));
- EXPECT_CALL(authorizer, getApprover(_, _))
- .Times(4)
- .InSequence(approverSequence)
- .WillRepeatedly(Return(updateFinishedApprover.future()));
-
- const v1::Offer& offer = offers->offers(0);
-
- v1::AgentID slaveId(offer.agent_id());
-
- v1::FrameworkID frameworkId(subscribed->framework_id());
-
- EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId))
- .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId));
-
- // Capture the acknowledgement messages to the agent so that we can delay the
- // authorizer until the master has processed the terminal acknowledgement.
- // NOTE: These calls are in reverse order because they use `EXPECT_CALL` under
- // the hood, and such expectations are evaluated in reverse order.
- Future<StatusUpdateAcknowledgementMessage> acknowledgeFinished =
- FUTURE_PROTOBUF(
- StatusUpdateAcknowledgementMessage(),
- master.get()->pid,
- slave.get()->pid);
- Future<StatusUpdateAcknowledgementMessage> acknowledgeRunning =
- FUTURE_PROTOBUF(
- StatusUpdateAcknowledgementMessage(),
- master.get()->pid,
- slave.get()->pid);
-
- EXPECT_CALL(*executor, connected(_))
- .WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
-
- EXPECT_CALL(*executor, subscribed(_, _));
-
- EXPECT_CALL(*executor, launch(_, _))
- .WillOnce(v1::executor::SendUpdateFromTask(
- frameworkId, evolve(executorId), v1::TASK_RUNNING));
-
- EXPECT_CALL(*executor, acknowledged(_, _))
- .WillOnce(v1::executor::SendUpdateFromTaskID(
- frameworkId, evolve(executorId), v1::TASK_FINISHED))
- .WillOnce(Return());
-
- TaskInfo task = createTask(devolve(offer), "", executorId);
-
- mesos.send(v1::createCallAccept(
- frameworkId,
- offer,
- {v1::LAUNCH({evolve(task)})}));
-
- // Wait until the task has finished and task update acknowledgements
- // have been processed to allow the authorizer to return.
- AWAIT_READY(acknowledgeRunning);
- AWAIT_READY(acknowledgeFinished);
-
- {
- taskAddedApprover.set(shared_ptr<const ObjectApprover>(
- std::make_shared<AcceptingObjectApprover>()));
-
- AWAIT_READY(event);
-
- ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
- ASSERT_EQ(evolve(task.task_id()),
- event->get().task_added().task().task_id());
- }
-
- event = decoder.read();
-
- {
- updateRunningApprover.set(shared_ptr<const ObjectApprover>(
- std::make_shared<AcceptingObjectApprover>()));
-
- AWAIT_READY(event);
-
- ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
- ASSERT_EQ(v1::TASK_RUNNING,
- event->get().task_updated().state());
- ASSERT_EQ(v1::TASK_RUNNING,
- event->get().task_updated().status().state());
- ASSERT_EQ(evolve(task.task_id()),
- event->get().task_updated().status().task_id());
- }
-
- event = decoder.read();
-
- {
- updateFinishedApprover.set(shared_ptr<const ObjectApprover>(
- std::make_shared<AcceptingObjectApprover>()));
-
- AWAIT_READY(event);
-
- ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
- ASSERT_EQ(v1::TASK_FINISHED,
- event->get().task_updated().state());
- ASSERT_EQ(v1::TASK_FINISHED,
- event->get().task_updated().status().state());
- ASSERT_EQ(evolve(task.task_id()),
- event->get().task_updated().status().task_id());
- }
-
- EXPECT_TRUE(reader.close());
-
- EXPECT_CALL(authorizer, getApprover(_, _))
- .WillRepeatedly(Return(std::make_shared<AcceptingObjectApprover>()));
-
- EXPECT_CALL(*executor, shutdown(_))
- .Times(AtMost(1));
-
- EXPECT_CALL(*executor, disconnected(_))
- .Times(AtMost(1));
-}
-
-
// This test tries to verify that a client subscribed to the 'api/v1' endpoint
// can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
// events.