You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/04 08:27:32 UTC

[mesos] branch master updated (1d9a157 -> 54227a3)

This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 1d9a157  Fixed build on platforms not treating initializer_list<enum> as literal.
     new f2a0eac  Converted sending operator API events to synchronous authorization.
     new 54227a3  Got rid of passing Shared<> into `Subscribers::Subscriber::send(...)`.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/master/http.cpp             |   2 +-
 src/master/master.cpp           |  91 +++++----------
 src/master/master.hpp           |  31 ++---
 src/master/readonly_handler.cpp |   3 +-
 src/tests/api_tests.cpp         | 242 ----------------------------------------
 5 files changed, 39 insertions(+), 330 deletions(-)


[mesos] 02/02: Got rid of passing Shared<> into `Subscribers::Subscriber::send(...)`.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 54227a33b68d83b97ef1a7c14283351d45322317
Author: Andrei Sekretenko <as...@apache.org>
AuthorDate: Fri Feb 28 22:39:54 2020 +0100

    Got rid of passing Shared<> into `Subscribers::Subscriber::send(...)`.
    
    Now that operator API events are authorized synchronously,
    `Subscribers::Subscriber::send(...)` is no longer deferred,
    and copying arguments of `Subscribers::send()` into `Shared`
    becomes unnecessary.
    
    Review: https://reviews.apache.org/r/72179
---
 src/master/master.cpp | 64 ++++++++++++++++++++-------------------------------
 src/master/master.hpp |  8 +++----
 2 files changed, 29 insertions(+), 43 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index e05fc8f..a8cca62 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -12232,63 +12232,49 @@ static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo)
 
 
 void Master::Subscribers::send(
-    mesos::master::Event&& event,
+    const mesos::master::Event& event,
     const Option<FrameworkInfo>& frameworkInfo,
     const Option<Task>& task)
 {
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
-  // TODO(asekretenko): Now that we have synchronous authorization,
-  // we can get rid of Shared in this code.
-
-  // Create a single copy of the event for all subscribers to share.
-  Shared<mesos::master::Event> sharedEvent(
-      new mesos::master::Event(std::move(event)));
-
-  // Create a single copy of `FrameworkInfo` and `Task` for all
-  // subscribers to share.
-  Shared<FrameworkInfo> sharedFrameworkInfo(
-      frameworkInfo.isSome()
-        ? new FrameworkInfo(frameworkInfo.get()) : nullptr);
-  Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
-
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
-    subscriber->send(sharedEvent, sharedFrameworkInfo, sharedTask);
+    subscriber->send(event, frameworkInfo, task);
   }
 }
 
 
 void Master::Subscribers::Subscriber::send(
-    const Shared<mesos::master::Event>& event,
-    const Shared<FrameworkInfo>& frameworkInfo,
-    const Shared<Task>& task)
+    const mesos::master::Event& event,
+    const Option<FrameworkInfo>& frameworkInfo,
+    const Option<Task>& task)
 {
-  switch (event->type()) {
+  switch (event.type()) {
     case mesos::master::Event::TASK_ADDED: {
-      CHECK_NOTNULL(frameworkInfo.get());
+      CHECK_SOME(frameworkInfo);
 
       if (approvers->approved<VIEW_TASK>(
-              event->task_added().task(), *frameworkInfo) &&
+              event.task_added().task(), *frameworkInfo) &&
           approvers->approved<VIEW_FRAMEWORK>(*frameworkInfo)) {
-        http.send(*event);
+        http.send(event);
       }
       break;
     }
     case mesos::master::Event::TASK_UPDATED: {
-      CHECK_NOTNULL(frameworkInfo.get());
-      CHECK_NOTNULL(task.get());
+      CHECK_SOME(frameworkInfo);
+      CHECK_SOME(task);
 
       if (approvers->approved<VIEW_TASK>(*task, *frameworkInfo) &&
           approvers->approved<VIEW_FRAMEWORK>(*frameworkInfo)) {
-        http.send(*event);
+        http.send(event);
       }
       break;
     }
     case mesos::master::Event::FRAMEWORK_ADDED: {
       if (approvers->approved<VIEW_FRAMEWORK>(
-              event->framework_added().framework().framework_info())) {
-        mesos::master::Event event_(*event);
+              event.framework_added().framework().framework_info())) {
+        mesos::master::Event event_(event);
         event_.mutable_framework_added()->mutable_framework()->
             mutable_allocated_resources()->Clear();
         event_.mutable_framework_added()->mutable_framework()->
@@ -12296,7 +12282,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event->framework_added().framework().allocated_resources()) {
+            event.framework_added().framework().allocated_resources()) {
           if (approvers->approved<VIEW_ROLE>(resource)) {
             event_.mutable_framework_added()->mutable_framework()->
               add_allocated_resources()->CopyFrom(resource);
@@ -12305,7 +12291,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event->framework_added().framework().offered_resources()) {
+            event.framework_added().framework().offered_resources()) {
           if (approvers->approved<VIEW_ROLE>(resource)) {
             event_.mutable_framework_added()->mutable_framework()->
               add_offered_resources()->CopyFrom(resource);
@@ -12318,8 +12304,8 @@ void Master::Subscribers::Subscriber::send(
     }
     case mesos::master::Event::FRAMEWORK_UPDATED: {
       if (approvers->approved<VIEW_FRAMEWORK>(
-              event->framework_updated().framework().framework_info())) {
-        mesos::master::Event event_(*event);
+              event.framework_updated().framework().framework_info())) {
+        mesos::master::Event event_(event);
         event_.mutable_framework_updated()->mutable_framework()->
           mutable_allocated_resources()->Clear();
         event_.mutable_framework_updated()->mutable_framework()->
@@ -12327,7 +12313,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event->framework_updated().framework().allocated_resources()) {
+            event.framework_updated().framework().allocated_resources()) {
           if (approvers->approved<VIEW_ROLE>(resource)) {
             event_.mutable_framework_updated()->mutable_framework()->
               add_allocated_resources()->CopyFrom(resource);
@@ -12336,7 +12322,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event->framework_updated().framework().offered_resources()) {
+            event.framework_updated().framework().offered_resources()) {
           if (approvers->approved<VIEW_ROLE>(resource)) {
             event_.mutable_framework_updated()->mutable_framework()->
               add_offered_resources()->CopyFrom(resource);
@@ -12349,19 +12335,19 @@ void Master::Subscribers::Subscriber::send(
     }
     case mesos::master::Event::FRAMEWORK_REMOVED: {
       if (approvers->approved<VIEW_FRAMEWORK>(
-              event->framework_removed().framework_info())) {
-        http.send(*event);
+              event.framework_removed().framework_info())) {
+        http.send(event);
       }
       break;
     }
     case mesos::master::Event::AGENT_ADDED: {
-      mesos::master::Event event_(*event);
+      mesos::master::Event event_(event);
       event_.mutable_agent_added()->mutable_agent()->
         mutable_total_resources()->Clear();
 
       foreach(
           const Resource& resource,
-          event->agent_added().agent().total_resources()) {
+          event.agent_added().agent().total_resources()) {
         if (approvers->approved<VIEW_ROLE>(resource)) {
           event_.mutable_agent_added()->mutable_agent()->add_total_resources()
             ->CopyFrom(resource);
@@ -12375,7 +12361,7 @@ void Master::Subscribers::Subscriber::send(
     case mesos::master::Event::SUBSCRIBED:
     case mesos::master::Event::HEARTBEAT:
     case mesos::master::Event::UNKNOWN:
-      http.send(*event);
+      http.send(event);
       break;
   }
 }
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6d99d4f..a57237d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2236,9 +2236,9 @@ private:
       // TODO(greggomann): Refactor this function into multiple event-specific
       // overloads. See MESOS-8475.
       void send(
-          const process::Shared<mesos::master::Event>& event,
-          const process::Shared<FrameworkInfo>& frameworkInfo,
-          const process::Shared<Task>& task);
+          const mesos::master::Event& event,
+          const Option<FrameworkInfo>& frameworkInfo,
+          const Option<Task>& task);
 
       ~Subscriber()
       {
@@ -2256,7 +2256,7 @@ private:
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
     void send(
-        mesos::master::Event&& event,
+        const mesos::master::Event& event,
         const Option<FrameworkInfo>& frameworkInfo = None(),
         const Option<Task>& task = None());
 


[mesos] 01/02: Converted sending operator API events to synchronous authorization.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f2a0eace106692860d1df3c282481f9855d3d629
Author: Andrei Sekretenko <as...@apache.org>
AuthorDate: Thu Feb 27 21:06:21 2020 +0100

    Converted sending operator API events to synchronous authorization.
    
    Now that `ObjectApprovers` are indefinitely valid, we can store
    them in each Subscriber for its whole lifetime, instead of creating
    them for each event sent to each subscriber.
    
    Review: https://reviews.apache.org/r/72178
---
 src/master/http.cpp             |   2 +-
 src/master/master.cpp           |  35 +-----
 src/master/master.hpp           |  23 +---
 src/master/readonly_handler.cpp |   3 +-
 src/tests/api_tests.cpp         | 242 ----------------------------------------
 5 files changed, 14 insertions(+), 291 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index f1be402..c51d9fd 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2454,7 +2454,7 @@ void Master::Http::processRequestsBatch() const
 
     postProcessing.state.visit(
         [&](const ReadOnlyHandler::PostProcessing::Subscribe& s) {
-          master->subscribe(s.connection, s.principal);
+          master->subscribe(s.connection, s.approvers);
         });
   }
 }
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7662c56..e05fc8f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -12239,6 +12239,9 @@ void Master::Subscribers::send(
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
+  // TODO(asekretenko): Now that we have synchronous authorization,
+  // we can get rid of Shared in this code.
+
   // Create a single copy of the event for all subscribers to share.
   Shared<mesos::master::Event> sharedEvent(
       new mesos::master::Event(std::move(event)));
@@ -12251,39 +12254,13 @@ void Master::Subscribers::send(
   Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
 
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
-    subscriber->getApprovers(
-        master->authorizer,
-        {VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
-      .then(defer(
-          master->self(),
-          [=](const Owned<ObjectApprovers>& approvers) {
-            subscriber->send(
-                sharedEvent,
-                approvers,
-                sharedFrameworkInfo,
-                sharedTask);
-
-            return Nothing();
-          }));
+    subscriber->send(sharedEvent, sharedFrameworkInfo, sharedTask);
   }
 }
 
 
-Future<Owned<ObjectApprovers>> Master::Subscribers::Subscriber::getApprovers(
-    const Option<Authorizer*>& authorizer,
-    std::initializer_list<authorization::Action> actions)
-{
-  Future<Owned<ObjectApprovers>> approvers =
-    ObjectApprovers::create(authorizer, principal, actions);
-
-  return approversSequence.add<Owned<ObjectApprovers>>(
-      [approvers] { return approvers; });
-}
-
-
 void Master::Subscribers::Subscriber::send(
     const Shared<mesos::master::Event>& event,
-    const Owned<ObjectApprovers>& approvers,
     const Shared<FrameworkInfo>& frameworkInfo,
     const Shared<Task>& task)
 {
@@ -12425,7 +12402,7 @@ void Master::exited(const id::UUID& id)
 
 void Master::subscribe(
     const StreamingHttpConnection<v1::master::Event>& http,
-    const Option<Principal>& principal)
+    const Owned<ObjectApprovers>& approvers)
 {
   LOG(INFO) << "Added subscriber " << http.streamId
             << " to the list of active subscribers";
@@ -12448,7 +12425,7 @@ void Master::subscribe(
   subscribers.subscribed.set(
       http.streamId,
       Owned<Subscribers::Subscriber>(
-          new Subscribers::Subscriber{http, principal}));
+          new Subscribers::Subscriber{http, approvers}));
 
   metrics->operator_event_stream_subscribers =
     subscribers.subscribed.size();
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 34ef2f1..6d99d4f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -52,7 +52,6 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
-#include <process/sequence.hpp>
 #include <process/timer.hpp>
 
 #include <process/metrics/counter.hpp>
@@ -939,7 +938,7 @@ private:
   // Subscribes a client to the 'api/vX' endpoint.
   void subscribe(
       const StreamingHttpConnection<v1::master::Event>& http,
-      const Option<process::http::authentication::Principal>& principal);
+      const process::Owned<ObjectApprovers>& approvers);
 
   void teardown(Framework* framework);
 
@@ -1307,7 +1306,7 @@ public:
     {
       struct Subscribe
       {
-        Option<process::http::authentication::Principal> principal;
+        process::Owned<ObjectApprovers> approvers;
         StreamingHttpConnection<v1::master::Event> connection;
       };
 
@@ -2215,7 +2214,7 @@ private:
     {
       Subscriber(
           const StreamingHttpConnection<v1::master::Event>& _http,
-          const Option<process::http::authentication::Principal> _principal)
+          const process::Owned<ObjectApprovers>& _approvers)
         : http(_http),
           heartbeater(
               "subscriber " + stringify(http.streamId),
@@ -2227,23 +2226,17 @@ private:
               http,
               DEFAULT_HEARTBEAT_INTERVAL,
               DEFAULT_HEARTBEAT_INTERVAL),
-          principal(_principal) {}
+          approvers(_approvers) {}
+
 
       // Not copyable, not assignable.
       Subscriber(const Subscriber&) = delete;
       Subscriber& operator=(const Subscriber&) = delete;
 
-      // Creates object approvers. The futures returned by this method will be
-      // completed in the calling order.
-      process::Future<process::Owned<ObjectApprovers>> getApprovers(
-          const Option<Authorizer*>& authorizer,
-          std::initializer_list<authorization::Action> actions);
-
       // TODO(greggomann): Refactor this function into multiple event-specific
       // overloads. See MESOS-8475.
       void send(
           const process::Shared<mesos::master::Event>& event,
-          const process::Owned<ObjectApprovers>& approvers,
           const process::Shared<FrameworkInfo>& frameworkInfo,
           const process::Shared<Task>& task);
 
@@ -2258,11 +2251,7 @@ private:
 
       StreamingHttpConnection<v1::master::Event> http;
       ResponseHeartbeater<mesos::master::Event, v1::master::Event> heartbeater;
-      const Option<process::http::authentication::Principal> principal;
-
-      // We maintain a sequence to coordinate the creation of object approvers
-      // in order to sequentialize all events to the subscriber.
-      process::Sequence approversSequence;
+      const process::Owned<ObjectApprovers> approvers;
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index e4a3134..b1336f9 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -2511,8 +2511,7 @@ pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
   http.send(heartbeatEvent);
 
   // This new subscriber needs to be added in the post-processing step.
-  Master::ReadOnlyHandler::PostProcessing::Subscribe s =
-    { approvers->principal, http };
+  Master::ReadOnlyHandler::PostProcessing::Subscribe s = {approvers, http};
 
   Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
 
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 21c090a..d1368a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3249,248 +3249,6 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
 }
 
 
-// Operator API events are sent using an asynchronous call chain. When
-// event-related state changes in the master before the authorizer returns, the
-// continuations which actually send the event should still have a consistent
-// view of the master state from the time when the event occurred. This test
-// forces task removal in the master before the authorizer returns in order to
-// verify that events are sent correctly in that case.
-TEST_P(MasterAPITest, EventAuthorizationDelayed)
-{
-  Clock::pause();
-
-  ContentType contentType = GetParam();
-
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-  auto executor = std::make_shared<v1::MockHTTPExecutor>();
-
-  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
-  TestContainerizer containerizer(executorId, executor);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
-
-  Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &containerizer, slaveFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(slaveFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      contentType,
-      scheduler);
-
-  AWAIT_READY(subscribed);
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  // Create an event stream after seeing first offer but before a task is
-  // launched. We should see one framework, one agent, and no tasks/executors.
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
-
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
-  Reader<v1::master::Event> decoder(deserializer, reader);
-
-  Future<Result<v1::master::Event>> event = decoder.read();
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
-  const v1::master::Response::GetState& getState =
-    event->get().subscribed().get_state();
-
-  EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
-  EXPECT_EQ(1, getState.get_agents().agents_size());
-  EXPECT_TRUE(getState.get_tasks().tasks().empty());
-  EXPECT_TRUE(getState.get_executors().executors().empty());
-
-  event = decoder.read();
-
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
-
-  event = decoder.read();
-  EXPECT_TRUE(event.isPending());
-
-  // When the authorizer is called, return pending futures
-  // that we can satisfy later.
-  Promise<shared_ptr<const ObjectApprover>> taskAddedApprover;
-  Promise<shared_ptr<const ObjectApprover>> updateRunningApprover;
-  Promise<shared_ptr<const ObjectApprover>> updateFinishedApprover;
-
-  Sequence approverSequence;
-
-  // Each event results in 4 calls into the authorizer.
-  // NOTE: This may change when the operator event stream code is refactored
-  // to avoid unnecessary authorizer calls. See MESOS-8475.
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(taskAddedApprover.future()));
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(updateRunningApprover.future()));
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .Times(4)
-    .InSequence(approverSequence)
-    .WillRepeatedly(Return(updateFinishedApprover.future()));
-
-  const v1::Offer& offer = offers->offers(0);
-
-  v1::AgentID slaveId(offer.agent_id());
-
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId))
-    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId));
-
-  // Capture the acknowledgement messages to the agent so that we can delay the
-  // authorizer until the master has processed the terminal acknowledgement.
-  // NOTE: These calls are in reverse order because they use `EXPECT_CALL` under
-  // the hood, and such expectations are evaluated in reverse order.
-  Future<StatusUpdateAcknowledgementMessage> acknowledgeFinished =
-    FUTURE_PROTOBUF(
-        StatusUpdateAcknowledgementMessage(),
-        master.get()->pid,
-        slave.get()->pid);
-  Future<StatusUpdateAcknowledgementMessage> acknowledgeRunning =
-    FUTURE_PROTOBUF(
-        StatusUpdateAcknowledgementMessage(),
-        master.get()->pid,
-        slave.get()->pid);
-
-  EXPECT_CALL(*executor, connected(_))
-    .WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
-
-  EXPECT_CALL(*executor, subscribed(_, _));
-
-  EXPECT_CALL(*executor, launch(_, _))
-    .WillOnce(v1::executor::SendUpdateFromTask(
-        frameworkId, evolve(executorId), v1::TASK_RUNNING));
-
-  EXPECT_CALL(*executor, acknowledged(_, _))
-    .WillOnce(v1::executor::SendUpdateFromTaskID(
-        frameworkId, evolve(executorId), v1::TASK_FINISHED))
-    .WillOnce(Return());
-
-  TaskInfo task = createTask(devolve(offer), "", executorId);
-
-  mesos.send(v1::createCallAccept(
-      frameworkId,
-      offer,
-      {v1::LAUNCH({evolve(task)})}));
-
-  // Wait until the task has finished and task update acknowledgements
-  // have been processed to allow the authorizer to return.
-  AWAIT_READY(acknowledgeRunning);
-  AWAIT_READY(acknowledgeFinished);
-
-  {
-    taskAddedApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_added().task().task_id());
-  }
-
-  event = decoder.read();
-
-  {
-    updateRunningApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
-    ASSERT_EQ(v1::TASK_RUNNING,
-              event->get().task_updated().state());
-    ASSERT_EQ(v1::TASK_RUNNING,
-              event->get().task_updated().status().state());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_updated().status().task_id());
-  }
-
-  event = decoder.read();
-
-  {
-    updateFinishedApprover.set(shared_ptr<const ObjectApprover>(
-        std::make_shared<AcceptingObjectApprover>()));
-
-    AWAIT_READY(event);
-
-    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
-    ASSERT_EQ(v1::TASK_FINISHED,
-              event->get().task_updated().state());
-    ASSERT_EQ(v1::TASK_FINISHED,
-              event->get().task_updated().status().state());
-    ASSERT_EQ(evolve(task.task_id()),
-              event->get().task_updated().status().task_id());
-  }
-
-  EXPECT_TRUE(reader.close());
-
-  EXPECT_CALL(authorizer, getApprover(_, _))
-    .WillRepeatedly(Return(std::make_shared<AcceptingObjectApprover>()));
-
-  EXPECT_CALL(*executor, shutdown(_))
-    .Times(AtMost(1));
-
-  EXPECT_CALL(*executor, disconnected(_))
-    .Times(AtMost(1));
-}
-
-
 // This test tries to verify that a client subscribed to the 'api/v1' endpoint
 // can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
 // events.