You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/04 08:27:33 UTC

[mesos] 01/02: Converted sending operator API events to synchronous authorization.

This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f2a0eace106692860d1df3c282481f9855d3d629
Author: Andrei Sekretenko <as...@apache.org>
AuthorDate: Thu Feb 27 21:06:21 2020 +0100

    Converted sending operator API events to synchronous authorization.
    
    Now that `ObjectApprovers` are indefinitely valid, we can store
    them in each Subscriber for its whole lifetime, instead of creating
    them for each event sent to each subscriber.
    
    Review: https://reviews.apache.org/r/72178
---
 src/master/http.cpp             |   2 +-
 src/master/master.cpp           |  35 +-----
 src/master/master.hpp           |  23 +---
 src/master/readonly_handler.cpp |   3 +-
 src/tests/api_tests.cpp         | 242 ----------------------------------------
 5 files changed, 14 insertions(+), 291 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index f1be402..c51d9fd 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2454,7 +2454,7 @@ void Master::Http::processRequestsBatch() const
 
     postProcessing.state.visit(
         [&](const ReadOnlyHandler::PostProcessing::Subscribe& s) {
-          master->subscribe(s.connection, s.principal);
+          master->subscribe(s.connection, s.approvers);
         });
   }
 }
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7662c56..e05fc8f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -12239,6 +12239,9 @@ void Master::Subscribers::send(
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
+  // TODO(asekretenko): Now that we have synchronous authorization,
+  // we can get rid of Shared in this code.
+
   // Create a single copy of the event for all subscribers to share.
   Shared<mesos::master::Event> sharedEvent(
       new mesos::master::Event(std::move(event)));
@@ -12251,39 +12254,13 @@ void Master::Subscribers::send(
   Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
 
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
-    subscriber->getApprovers(
-        master->authorizer,
-        {VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
-      .then(defer(
-          master->self(),
-          [=](const Owned<ObjectApprovers>& approvers) {
-            subscriber->send(
-                sharedEvent,
-                approvers,
-                sharedFrameworkInfo,
-                sharedTask);
-
-            return Nothing();
-          }));
+    subscriber->send(sharedEvent, sharedFrameworkInfo, sharedTask);
   }
 }
 
 
-Future<Owned<ObjectApprovers>> Master::Subscribers::Subscriber::getApprovers(
-    const Option<Authorizer*>& authorizer,
-    std::initializer_list<authorization::Action> actions)
-{
-  Future<Owned<ObjectApprovers>> approvers =
-    ObjectApprovers::create(authorizer, principal, actions);
-
-  return approversSequence.add<Owned<ObjectApprovers>>(
-      [approvers] { return approvers; });
-}
-
-
 void Master::Subscribers::Subscriber::send(
     const Shared<mesos::master::Event>& event,
-    const Owned<ObjectApprovers>& approvers,
     const Shared<FrameworkInfo>& frameworkInfo,
     const Shared<Task>& task)
 {
@@ -12425,7 +12402,7 @@ void Master::exited(const id::UUID& id)
 
 void Master::subscribe(
     const StreamingHttpConnection<v1::master::Event>& http,
-    const Option<Principal>& principal)
+    const Owned<ObjectApprovers>& approvers)
 {
   LOG(INFO) << "Added subscriber " << http.streamId
             << " to the list of active subscribers";
@@ -12448,7 +12425,7 @@ void Master::subscribe(
   subscribers.subscribed.set(
       http.streamId,
       Owned<Subscribers::Subscriber>(
-          new Subscribers::Subscriber{http, principal}));
+          new Subscribers::Subscriber{http, approvers}));
 
   metrics->operator_event_stream_subscribers =
     subscribers.subscribed.size();
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 34ef2f1..6d99d4f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -52,7 +52,6 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
-#include <process/sequence.hpp>
 #include <process/timer.hpp>
 
 #include <process/metrics/counter.hpp>
@@ -939,7 +938,7 @@ private:
   // Subscribes a client to the 'api/vX' endpoint.
   void subscribe(
       const StreamingHttpConnection<v1::master::Event>& http,
-      const Option<process::http::authentication::Principal>& principal);
+      const process::Owned<ObjectApprovers>& approvers);
 
   void teardown(Framework* framework);
 
@@ -1307,7 +1306,7 @@ public:
     {
       struct Subscribe
       {
-        Option<process::http::authentication::Principal> principal;
+        process::Owned<ObjectApprovers> approvers;
         StreamingHttpConnection<v1::master::Event> connection;
       };
 
@@ -2215,7 +2214,7 @@ private:
     {
       Subscriber(
           const StreamingHttpConnection<v1::master::Event>& _http,
-          const Option<process::http::authentication::Principal> _principal)
+          const process::Owned<ObjectApprovers>& _approvers)
         : http(_http),
           heartbeater(
               "subscriber " + stringify(http.streamId),
@@ -2227,23 +2226,17 @@ private:
               http,
               DEFAULT_HEARTBEAT_INTERVAL,
               DEFAULT_HEARTBEAT_INTERVAL),
-          principal(_principal) {}
+          approvers(_approvers) {}
+
 
       // Not copyable, not assignable.
       Subscriber(const Subscriber&) = delete;
       Subscriber& operator=(const Subscriber&) = delete;
 
-      // Creates object approvers. The futures returned by this method will be
-      // completed in the calling order.
-      process::Future<process::Owned<ObjectApprovers>> getApprovers(
-          const Option<Authorizer*>& authorizer,
-          std::initializer_list<authorization::Action> actions);
-
       // TODO(greggomann): Refactor this function into multiple event-specific
       // overloads. See MESOS-8475.
       void send(
           const process::Shared<mesos::master::Event>& event,
-          const process::Owned<ObjectApprovers>& approvers,
           const process::Shared<FrameworkInfo>& frameworkInfo,
           const process::Shared<Task>& task);
 
@@ -2258,11 +2251,7 @@ private:
 
       StreamingHttpConnection<v1::master::Event> http;
       ResponseHeartbeater<mesos::master::Event, v1::master::Event> heartbeater;
-      const Option<process::http::authentication::Principal> principal;
-
-      // We maintain a sequence to coordinate the creation of object approvers
-      // in order to sequentialize all events to the subscriber.
-      process::Sequence approversSequence;
+      const process::Owned<ObjectApprovers> approvers;
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index e4a3134..b1336f9 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -2511,8 +2511,7 @@ pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
   http.send(heartbeatEvent);
 
   // This new subscriber needs to be added in the post-processing step.
-  Master::ReadOnlyHandler::PostProcessing::Subscribe s =
-    { approvers->principal, http };
+  Master::ReadOnlyHandler::PostProcessing::Subscribe s = {approvers, http};
 
   Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
 
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 21c090a..d1368a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3249,248 +3249,6 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
 }
 
 
-// Operator API events are sent using an asynchronous call chain. When
-// event-related state changes in the master before the authorizer returns, the
-// continuations which actually send the event should still have a consistent
-// view of the master state from the time when the event occurred. This test
-// forces task removal in the master before the authorizer returns in order to
-// verify that events are sent correctly in that case.
-TEST_P(MasterAPITest, EventAuthorizationDelayed)
-{
-  Clock::pause();
-
-  ContentType contentType = GetParam();
-
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-  auto executor = std::make_shared<v1::MockHTTPExecutor>();
-
-  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
-  TestContainerizer containerizer(executorId, executor);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
-
-  Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &containerizer, slaveFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(slaveFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      contentType,
-      scheduler);
-
-  AWAIT_READY(subscribed);
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  // Create an event stream after seeing first offer but before a task is
-  // launched. We should see one framework, one agent, and no tasks/executors.
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
-
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
-  Reader<v1::master::Event> decoder(deserializer, reader);
-
-  Future<Result<v1::master::Event>> event = decoder.read();
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
-  const v1::master::Response::GetState& getState =
-    event->get().subscribed().get_state();
-
-  EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
-  EXPECT_EQ(1, getState.get_agents().agents_size());
-  EXPECT_TRUE(getState.get_tasks().tasks().empty());
-  EXPECT_TRUE(getState.get_executors().executors().empty());
-
-  event = decoder.read();
-
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
-
-  event = decoder.read();
-  EXPECT_TRUE(event.isPending());
-
-  // When the authorizer is called, return pending futures
-  // that we can satisfy later.
-  Promise<shared_ptr<const ObjectApprover>> taskAddedApprover;
-  Promise<shared_ptr<const ObjectApprover>> updateRunningApprover;
-  Promise<shared_ptr<const ObjectApprover>> updateFinishedApprover;
-
-  Sequence approverSequence;
-
-  // Each event results in 4 calls into the authorizer.
-  // NOTE: This may change when the operator event stream code is refactored
-  // to avoid unnecessary authorizer calls. See MESOS-8475.
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(taskAddedApprover.future()));
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(updateRunningApprover.future()));
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(updateFinishedApprover.future()));
-
-  const v1::Offer& offer = offers->offers(0);
-
-  v1::AgentID slaveId(offer.agent_id());
-
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId))
-    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId));
-
-  // Capture the acknowledgement messages to the agent so that we can delay the
-  // authorizer until the master has processed the terminal acknowledgement.
-  // NOTE: These calls are in reverse order because they use `EXPECT_CALL` under
-  // the hood, and such expectations are evaluated in reverse order.
-  Future<StatusUpdateAcknowledgementMessage> acknowledgeFinished =
-    FUTURE_PROTOBUF(
-        StatusUpdateAcknowledgementMessage(),
-        master.get()->pid,
-        slave.get()->pid);
-  Future<StatusUpdateAcknowledgementMessage> acknowledgeRunning =
-    FUTURE_PROTOBUF(
-        StatusUpdateAcknowledgementMessage(),
-        master.get()->pid,
-        slave.get()->pid);
-
-  EXPECT_CALL(*executor, connected(_))
-    .WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
-
-  EXPECT_CALL(*executor, subscribed(_, _));
-
-  EXPECT_CALL(*executor, launch(_, _))
-    .WillOnce(v1::executor::SendUpdateFromTask(
-        frameworkId, evolve(executorId), v1::TASK_RUNNING));
-
-  EXPECT_CALL(*executor, acknowledged(_, _))
-    .WillOnce(v1::executor::SendUpdateFromTaskID(
-        frameworkId, evolve(executorId), v1::TASK_FINISHED))
-    .WillOnce(Return());
-
-  TaskInfo task = createTask(devolve(offer), "", executorId);
-
-  mesos.send(v1::createCallAccept(
-      frameworkId,
-      offer,
-      {v1::LAUNCH({evolve(task)})}));
-
-  // Wait until the task has finished and task update acknowledgements
-  // have been processed to allow the authorizer to return.
-  AWAIT_READY(acknowledgeRunning);
-  AWAIT_READY(acknowledgeFinished);
-
-  {
-    taskAddedApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_added().task().task_id());
-  }
-
-  event = decoder.read();
-
-  {
-    updateRunningApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
-    ASSERT_EQ(v1::TASK_RUNNING,
-              event->get().task_updated().state());
-    ASSERT_EQ(v1::TASK_RUNNING,
-              event->get().task_updated().status().state());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_updated().status().task_id());
-  }
-
-  event = decoder.read();
-
-  {
-    updateFinishedApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
-    ASSERT_EQ(v1::TASK_FINISHED,
-              event->get().task_updated().state());
-    ASSERT_EQ(v1::TASK_FINISHED,
-              event->get().task_updated().status().state());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_updated().status().task_id());
-  }
-
-  EXPECT_TRUE(reader.close());
-
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .WillRepeatedly(Return(std::make_shared<AcceptingObjectApprover>()));
-
-  EXPECT_CALL(*executor, shutdown(_))
-    .Times(AtMost(1));
-
-  EXPECT_CALL(*executor, disconnected(_))
-    .Times(AtMost(1));
-}
-
-
 // This test tries to verify that a client subscribed to the 'api/v1' endpoint
 // can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
 // events.