You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/07/06 21:53:41 UTC

mesos git commit: Refactor all `get*` helper functions for master API to blocking version.

Repository: mesos
Updated Branches:
  refs/heads/master 8cbc1aeff -> b16dbb2c2


Refactor all `get*` helper functions for master API to blocking version.

This makes sure \`GetState()\` collection is done in an
atomic fashion and avoid a race condition when doing `subscribe`.

Things can be improved later:
\- \`Swap\` and RVO to avoid large data copy.
\- refactor the common logic of `ObjectApprover` into another helper
   function.
\- craft out test case for actual race condition.

Review: https://reviews.apache.org/r/49722/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b16dbb2c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b16dbb2c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b16dbb2c

Branch: refs/heads/master
Commit: b16dbb2c22faab5bdff088aef668f15a7dcf696f
Parents: 8cbc1ae
Author: Zhitao Li <zh...@gmail.com>
Authored: Wed Jul 6 16:48:31 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 6 16:53:15 2016 -0500

----------------------------------------------------------------------
 src/master/http.cpp   | 680 +++++++++++++++++++++++++--------------------
 src/master/master.hpp |  28 +-
 2 files changed, 393 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b16dbb2c/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 32b15d1..bff1fd5 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -588,41 +588,83 @@ Future<Response> Master::Http::api(
     case mesos::master::Call::REMOVE_QUOTA:
       return quotaHandler.remove(call, principal);
 
-    case mesos::master::Call::SUBSCRIBE: {
-      return _getState(principal)
-        .then(defer(master->self(),
-            [this, acceptType]
-            (const mesos::master::Response::GetState& getState)
-              -> Future<Response> {
-          // TODO(zhitao): There is a possible race condition here: if an action
-          // like `taskUpdate()` is queued between `_getState()` and this
-          // continuation, neither the event will be sent to the subscriber
-          // (because the connection is not in subscribers yet), nor
-          // the effect of the change would be captured in the snapshot.
-          Pipe pipe;
-          OK ok;
-
-          ok.headers["Content-Type"] = stringify(acceptType);
-          ok.type = Response::PIPE;
-          ok.reader = pipe.reader();
-
-          HttpConnection http {pipe.writer(), acceptType, UUID::random()};
-          master->subscribe(http);
-
-          mesos::master::Event event;
-          event.set_type(mesos::master::Event::SUBSCRIBED);
-          event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState);
-          http.send<mesos::master::Event, v1::master::Event>(event);
-
-          return ok;
-        }));
-    }
+    case mesos::master::Call::SUBSCRIBE:
+      return subscribe(call, principal, acceptType);
   }
 
   UNREACHABLE();
 }
 
 
+Future<Response> Master::Http::subscribe(
+    const mesos::master::Call& call,
+    const Option<string>& principal,
+    ContentType contentType) const
+{
+  CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());
+
+  // Retrieve Approvers for authorizing frameworks and tasks.
+  Future<Owned<ObjectApprover>> frameworksApprover;
+  Future<Owned<ObjectApprover>> tasksApprover;
+  Future<Owned<ObjectApprover>> executorsApprover;
+  if (master->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    frameworksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_FRAMEWORK);
+
+    tasksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_TASK);
+
+    executorsApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_EXECUTOR);
+  } else {
+    frameworksApprover = Owned<ObjectApprover>(
+      new AcceptingObjectApprover());
+    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    executorsApprover = Owned<ObjectApprover>(
+      new AcceptingObjectApprover());
+  }
+
+  return collect(frameworksApprover, tasksApprover, executorsApprover)
+    .then(defer(master->self(),
+      [=](const tuple<Owned<ObjectApprover>,
+                      Owned<ObjectApprover>,
+                      Owned<ObjectApprover>>& approvers)
+        -> Future<Response> {
+      // Get approver from tuple.
+      Owned<ObjectApprover> frameworksApprover;
+      Owned<ObjectApprover> tasksApprover;
+      Owned<ObjectApprover> executorsApprover;
+      tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
+      Pipe pipe;
+      OK ok;
+
+      ok.headers["Content-Type"] = stringify(contentType);
+      ok.type = Response::PIPE;
+      ok.reader = pipe.reader();
+
+      HttpConnection http {pipe.writer(), contentType, UUID::random()};
+      master->subscribe(http);
+
+      mesos::master::Event event;
+      event.set_type(mesos::master::Event::SUBSCRIBED);
+      event.mutable_subscribed()->mutable_get_state()->CopyFrom(
+        _getState(frameworksApprover,
+                  tasksApprover,
+                  executorsApprover));
+
+      http.send<mesos::master::Event, v1::master::Event>(event);
+
+      return ok;
+    }));
+}
+
+
 // TODO(ijimenez): Add some information or pointers to help
 // users understand the HTTP Event/Call API.
 string Master::Http::SCHEDULER_HELP()
@@ -1361,22 +1403,6 @@ Future<Response> Master::Http::getFrameworks(
 {
   CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
 
-  return _getFrameworks(principal)
-    .then([contentType](
-        const mesos::master::Response::GetFrameworks& getFrameworks)
-          -> Future<Response> {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_FRAMEWORKS);
-      response.mutable_get_frameworks()->CopyFrom(getFrameworks);
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-    });
-}
-
-
-Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
-    const Option<string>& principal) const
-{
   // Retrieve `ObjectApprover`s for authorizing frameworks.
   Future<Owned<ObjectApprover>> frameworksApprover;
 
@@ -1396,58 +1422,70 @@ Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
   return frameworksApprover
     .then(defer(master->self(),
         [=](const Owned<ObjectApprover>& frameworksApprover)
-          -> Future<mesos::master::Response::GetFrameworks> {
-      mesos::master::Response::GetFrameworks getFrameworks;
+          -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+      response.mutable_get_frameworks()->CopyFrom(
+          _getFrameworks(frameworksApprover));
 
-      foreachvalue (const Framework* framework,
-                    master->frameworks.registered) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    }));
+}
 
-        getFrameworks.add_frameworks()->CopyFrom(model(*framework));
-      }
 
-      foreach (const std::shared_ptr<Framework>& framework,
-               master->frameworks.completed) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+mesos::master::Response::GetFrameworks Master::Http::_getFrameworks(
+    const Owned<ObjectApprover>& frameworksApprover) const
+{
+  mesos::master::Response::GetFrameworks getFrameworks;
+  foreachvalue (const Framework* framework,
+                master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-        getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
-      }
+    getFrameworks.add_frameworks()->CopyFrom(model(*framework));
+  }
 
-      foreachvalue (const Slave* slave, master->slaves.registered) {
-        foreachkey (const FrameworkID& frameworkId, slave->tasks) {
-          if (!master->frameworks.registered.contains(frameworkId)) {
-            // TODO(haosdent): This logic should be simplified after
-            // a deprecation cycle starting with 1.0 as after that
-            // we can rely on `master->frameworks.recovered` containing
-            // all FrameworkInfos.
-            // Until then there are 3 cases:
-            // - No authorization enabled: show all orphaned frameworks.
-            // - Authorization enabled, but no FrameworkInfo present:
-            //   do not show orphaned frameworks.
-            // - Authorization enabled, FrameworkInfo present: filter
-            //   based on `approveViewFrameworkInfo`.
-            if (master->authorizer.isSome() &&
-               (!master->frameworks.recovered.contains(frameworkId) ||
-                !approveViewFrameworkInfo(
-                    frameworksApprover,
-                    master->frameworks.recovered[frameworkId]))) {
-              continue;
-            }
+  foreach (const std::shared_ptr<Framework>& framework,
+           master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-            getFrameworks.add_recovered_frameworks()->CopyFrom(
-                master->frameworks.recovered[frameworkId]);
-          }
+    getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
+  }
+
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+      if (!master->frameworks.registered.contains(frameworkId)) {
+        // TODO(haosdent): This logic should be simplified after
+        // a deprecation cycle starting with 1.0 as after that
+        // we can rely on `master->frameworks.recovered` containing
+        // all FrameworkInfos.
+        // Until then there are 3 cases:
+        // - No authorization enabled: show all orphaned frameworks.
+        // - Authorization enabled, but no FrameworkInfo present:
+        //   do not show orphaned frameworks.
+        // - Authorization enabled, FrameworkInfo present: filter
+        //   based on `approveViewFrameworkInfo`.
+        if (master->authorizer.isSome() &&
+           (!master->frameworks.recovered.contains(frameworkId) ||
+            !approveViewFrameworkInfo(
+                frameworksApprover,
+                master->frameworks.recovered[frameworkId]))) {
+          continue;
         }
+
+        getFrameworks.add_recovered_frameworks()->CopyFrom(
+            master->frameworks.recovered[frameworkId]);
       }
+    }
+  }
 
-      return getFrameworks;
-    }));
+  return getFrameworks;
 }
 
 
@@ -1458,23 +1496,6 @@ Future<Response> Master::Http::getExecutors(
 {
   CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
 
-  return _getExecutors(principal)
-    .then(
-      [contentType](const mesos::master::Response::GetExecutors& getExecutors)
-        -> Future<Response> {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_EXECUTORS);
-      response.mutable_get_executors()->CopyFrom(getExecutors);
-
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-    });
-}
-
-
-Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
-    const Option<std::string>& principal) const
-{
   // Retrieve `ObjectApprover`s for authorizing frameworks and executors.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> executorsApprover;
@@ -1498,95 +1519,110 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
     .then(defer(master->self(),
         [=](const tuple<Owned<ObjectApprover>,
                         Owned<ObjectApprover>>& approvers)
-          -> Future<mesos::master::Response::GetExecutors> {
+          -> Future<Response> {
       // Get approver from tuple.
       Owned<ObjectApprover> frameworksApprover;
       Owned<ObjectApprover> executorsApprover;
       tie(frameworksApprover, executorsApprover) = approvers;
 
-      // Construct framework list with both active and completed frameworks.
-      vector<const Framework*> frameworks;
-      foreachvalue (Framework* framework, master->frameworks.registered) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_EXECUTORS);
 
-        frameworks.push_back(framework);
-      }
+      response.mutable_get_executors()->CopyFrom(
+          _getExecutors(frameworksApprover, executorsApprover));
 
-      foreach (const std::shared_ptr<Framework>& framework,
-               master->frameworks.completed) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    }));
+}
 
-        frameworks.push_back(framework.get());
-      }
 
-      mesos::master::Response::GetExecutors getExecutors;
+mesos::master::Response::GetExecutors Master::Http::_getExecutors(
+      const Owned<ObjectApprover>& frameworksApprover,
+      const Owned<ObjectApprover>& executorsApprover) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-      foreach (const Framework* framework, frameworks) {
-        foreachpair (const SlaveID& slaveId,
-                     const auto& executorsMap,
-                     framework->executors) {
-          foreachvalue (const ExecutorInfo& info, executorsMap) {
-            // Skip unauthorized executors.
-            if (!approveViewExecutorInfo(executorsApprover,
-                                         info,
-                                         framework->info)) {
-              continue;
-            }
+    frameworks.push_back(framework);
+  }
 
-            mesos::master::Response::GetExecutors::Executor* executor =
-              getExecutors.add_executors();
+  foreach (const std::shared_ptr<Framework>& framework,
+           master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-            executor->mutable_executor_info()->CopyFrom(info);
-            executor->mutable_slave_id()->CopyFrom(slaveId);
-          }
+    frameworks.push_back(framework.get());
+  }
+
+  mesos::master::Response::GetExecutors getExecutors;
+
+  foreach (const Framework* framework, frameworks) {
+    foreachpair (const SlaveID& slaveId,
+                 const auto& executorsMap,
+                 framework->executors) {
+      foreachvalue (const ExecutorInfo& info, executorsMap) {
+        // Skip unauthorized executors.
+        if (!approveViewExecutorInfo(executorsApprover,
+                                     info,
+                                     framework->info)) {
+          continue;
         }
-      }
 
-      // Orphan executors.
-      foreachvalue (const Slave* slave, master->slaves.registered) {
-        typedef hashmap<ExecutorID, ExecutorInfo> ExecutorMap;
-        foreachpair (const FrameworkID& frameworkId,
-                     const ExecutorMap& executors,
-                     slave->executors) {
-          foreachvalue (const ExecutorInfo& info, executors) {
-            if (!master->frameworks.registered.contains(frameworkId)) {
-              // TODO(haosdent): This logic should be simplified after
-              // a deprecation cycle starting with 1.0 as after that
-              // we can rely on `master->frameworks.recovered` containing
-              // all FrameworkInfos.
-              // Until then there are 3 cases:
-              // - No authorization enabled: show all orphaned executors.
-              // - Authorization enabled, but no FrameworkInfo present:
-              //   do not show orphaned executors.
-              // - Authorization enabled, FrameworkInfo present: filter
-              //   based on `approveViewExecutorInfo`.
-              if (master->authorizer.isSome() &&
-                 (!master->frameworks.recovered.contains(frameworkId) ||
-                  !approveViewExecutorInfo(
-                      executorsApprover,
-                      info,
-                      master->frameworks.recovered[frameworkId]))) {
-                continue;
-              }
+        mesos::master::Response::GetExecutors::Executor* executor =
+          getExecutors.add_executors();
 
-              mesos::master::Response::GetExecutors::Executor* executor =
-                getExecutors.add_orphan_executors();
+        executor->mutable_executor_info()->CopyFrom(info);
+        executor->mutable_slave_id()->CopyFrom(slaveId);
+      }
+    }
+  }
 
-              executor->mutable_executor_info()->CopyFrom(info);
-              executor->mutable_slave_id()->CopyFrom(slave->id);
-            }
+  // Orphan executors.
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    typedef hashmap<ExecutorID, ExecutorInfo> ExecutorMap;
+    foreachpair (const FrameworkID& frameworkId,
+                 const ExecutorMap& executors,
+                 slave->executors) {
+      foreachvalue (const ExecutorInfo& info, executors) {
+        if (!master->frameworks.registered.contains(frameworkId)) {
+          // TODO(haosdent): This logic should be simplified after
+          // a deprecation cycle starting with 1.0 as after that
+          // we can rely on `master->frameworks.recovered` containing
+          // all FrameworkInfos.
+          // Until then there are 3 cases:
+          // - No authorization enabled: show all orphaned executors.
+          // - Authorization enabled, but no FrameworkInfo present:
+          //   do not show orphaned executors.
+          // - Authorization enabled, FrameworkInfo present: filter
+          //   based on `approveViewExecutorInfo`.
+          if (master->authorizer.isSome() &&
+             (!master->frameworks.recovered.contains(frameworkId) ||
+              !approveViewExecutorInfo(
+                  executorsApprover,
+                  info,
+                  master->frameworks.recovered[frameworkId]))) {
+            continue;
           }
+
+          mesos::master::Response::GetExecutors::Executor* executor =
+            getExecutors.add_orphan_executors();
+
+          executor->mutable_executor_info()->CopyFrom(info);
+          executor->mutable_slave_id()->CopyFrom(slave->id);
         }
       }
+    }
+  }
 
-      return getExecutors;
-    }));
+  return getExecutors;
 }
 
 
@@ -1597,44 +1633,79 @@ Future<Response> Master::Http::getState(
 {
   CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
 
-  return _getState(principal)
-    .then([contentType](const mesos::master::Response::GetState& getState)
-      -> Future<Response> {
+  // Retrieve Approvers for authorizing frameworks and tasks.
+  Future<Owned<ObjectApprover>> frameworksApprover;
+  Future<Owned<ObjectApprover>> tasksApprover;
+  Future<Owned<ObjectApprover>> executorsApprover;
+  if (master->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    frameworksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_FRAMEWORK);
+
+    tasksApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_TASK);
+
+    executorsApprover = master->authorizer.get()->getObjectApprover(
+        subject, authorization::VIEW_EXECUTOR);
+  } else {
+    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
+
+  return collect(frameworksApprover, tasksApprover, executorsApprover)
+    .then(defer(master->self(),
+      [=](const tuple<Owned<ObjectApprover>,
+                      Owned<ObjectApprover>,
+                      Owned<ObjectApprover>>& approvers)
+        -> Future<Response> {
+      // Get approver from tuple.
+      Owned<ObjectApprover> frameworksApprover;
+      Owned<ObjectApprover> tasksApprover;
+      Owned<ObjectApprover> executorsApprover;
+      tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
       mesos::master::Response response;
       response.set_type(mesos::master::Response::GET_STATE);
-      response.mutable_get_state()->CopyFrom(getState);
+      response.mutable_get_state()->CopyFrom(
+          _getState(frameworksApprover,
+                    tasksApprover,
+                    executorsApprover));
 
       return OK(serialize(contentType, evolve(response)),
                 stringify(contentType));
-    });
+    }));
 }
 
 
-Future<mesos::master::Response::GetState> Master::Http::_getState(
-    const Option<string>& principal) const
+mesos::master::Response::GetState Master::Http::_getState(
+    const Owned<ObjectApprover>& frameworksApprover,
+    const Owned<ObjectApprover>& tasksApprover,
+    const Owned<ObjectApprover>& executorsApprover) const
 {
-  return collect(
-      _getTasks(principal),
-      _getExecutors(principal),
-      _getFrameworks(principal),
-      _getAgents(principal))
-    .then(defer(master->self(),
-      [](const tuple<mesos::master::Response::GetTasks,
-                     mesos::master::Response::GetExecutors,
-                     mesos::master::Response::GetFrameworks,
-                     mesos::master::Response::GetAgents>& results)
-        -> mesos::master::Response::GetState {
-      mesos::master::Response::GetState getState;
-
-      // Use std::get instead of std::tie to avoid
-      // unnecessary copy of large data structs.
-      getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
-      getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
-      getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
-      getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
-
-      return getState;
-    }));
+  // NOTE: This function must be blocking instead of returning a
+  // `Future`. This is because `subscribe()` needs to atomically
+  // add subscriber to `subscribers` map and send the captured state
+  // in `SUBSCRIBED` without being interleaved by any other events.
+
+  mesos::master::Response::GetState getState;
+
+  getState.mutable_get_tasks()->CopyFrom(
+    _getTasks(frameworksApprover, tasksApprover));
+
+  getState.mutable_get_executors()->CopyFrom(
+    _getExecutors(frameworksApprover, executorsApprover));
+
+  getState.mutable_get_frameworks()->CopyFrom(
+    _getFrameworks(frameworksApprover));
+
+  getState.mutable_get_agents()->CopyFrom(_getAgents());
+
+  return getState;
 }
 
 
@@ -2210,21 +2281,16 @@ Future<process::http::Response> Master::Http::getAgents(
 {
   CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
 
-  return _getAgents(principal)
-    .then([contentType](const mesos::master::Response::GetAgents& getAgents)
-      -> Future<Response> {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_AGENTS);
-      response.mutable_get_agents()->CopyFrom(getAgents);
+  mesos::master::Response response;
+  response.set_type(mesos::master::Response::GET_AGENTS);
+  response.mutable_get_agents()->CopyFrom(_getAgents());
 
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-  });
+  return OK(serialize(contentType, evolve(response)),
+            stringify(contentType));
 }
 
 
-Future<mesos::master::Response::GetAgents> Master::Http::_getAgents(
-    const Option<string>& principal) const
+mesos::master::Response::GetAgents Master::Http::_getAgents() const
 {
   mesos::master::Response::GetAgents getAgents;
   foreachvalue (const Slave* slave, master->slaves.registered) {
@@ -3523,20 +3589,6 @@ Future<Response> Master::Http::getTasks(
 {
   CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
 
-  return _getTasks(principal)
-    .then([contentType](const mesos::master::Response::GetTasks& getTasks)
-        -> Future<Response> {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_TASKS);
-      response.mutable_get_tasks()->CopyFrom(getTasks);
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-  });
-}
-
-
-Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
-    const Option<string>& principal) const {
   // Retrieve Approvers for authorizing frameworks and tasks.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> tasksApprover;
@@ -3560,105 +3612,123 @@ Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
     .then(defer(master->self(),
       [=](const tuple<Owned<ObjectApprover>,
                       Owned<ObjectApprover>>& approvers)
-        -> Future<mesos::master::Response::GetTasks> {
+        -> Future<Response> {
       // Get approver from tuple.
       Owned<ObjectApprover> frameworksApprover;
       Owned<ObjectApprover> tasksApprover;
       tie(frameworksApprover, tasksApprover) = approvers;
 
-      // Construct framework list with both active and completed frameworks.
-      vector<const Framework*> frameworks;
-      foreachvalue (Framework* framework, master->frameworks.registered) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_TASKS);
 
-        frameworks.push_back(framework);
-      }
+      response.mutable_get_tasks()->CopyFrom(
+          _getTasks(frameworksApprover,
+                    tasksApprover));
 
-      foreach (const std::shared_ptr<Framework>& framework,
-               master->frameworks.completed) {
-        // Skip unauthorized frameworks.
-        if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
-          continue;
-        }
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+  }));
+}
 
-        frameworks.push_back(framework.get());
-      }
 
-      mesos::master::Response::GetTasks getTasks;
-      vector<const Task*> tasks;
-      foreach (const Framework* framework, frameworks) {
-        // Pending tasks.
-        foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
-          // Skip unauthorized tasks.
-          if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
-            continue;
-          }
+mesos::master::Response::GetTasks Master::Http::_getTasks(
+    const Owned<ObjectApprover>& frameworksApprover,
+    const Owned<ObjectApprover>& tasksApprover) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-          const Task& task =
-            protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+    frameworks.push_back(framework);
+  }
 
-          getTasks.add_pending_tasks()->CopyFrom(task);
-        }
+  foreach (const std::shared_ptr<Framework>& framework,
+           master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+      continue;
+    }
 
-        // Active tasks.
-        foreachvalue (Task* task, framework->tasks) {
-          CHECK_NOTNULL(task);
-          // Skip unauthorized tasks.
-          if (!approveViewTask(tasksApprover, *task, framework->info)) {
-            continue;
-          }
+    frameworks.push_back(framework.get());
+  }
 
-          getTasks.add_tasks()->CopyFrom(*task);
-        }
+  mesos::master::Response::GetTasks getTasks;
 
-        // Completed tasks.
-        foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
-          // Skip unauthorized tasks.
-          if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
-            continue;
-          }
+  vector<const Task*> tasks;
+  foreach (const Framework* framework, frameworks) {
+    // Pending tasks.
+    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+      // Skip unauthorized tasks.
+      if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+        continue;
+      }
 
-          getTasks.add_completed_tasks()->CopyFrom(*task);
-        }
+      const Task& task =
+        protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+      getTasks.add_pending_tasks()->CopyFrom(task);
+    }
+
+    // Active tasks.
+    foreachvalue (Task* task, framework->tasks) {
+      CHECK_NOTNULL(task);
+      // Skip unauthorized tasks.
+      if (!approveViewTask(tasksApprover, *task, framework->info)) {
+        continue;
       }
 
-      // Orphan tasks.
-      foreachvalue (const Slave* slave, master->slaves.registered) {
-        typedef hashmap<TaskID, Task*> TaskMap;
-        foreachvalue (const TaskMap& tasks, slave->tasks) {
-          foreachvalue (const Task* task, tasks) {
-            CHECK_NOTNULL(task);
-            const FrameworkID& frameworkId = task->framework_id();
-            if (!master->frameworks.registered.contains(frameworkId)) {
-              // TODO(joerg84): This logic should be simplified after
-              // a deprecation cycle starting with 1.0 as after that
-              // we can rely on `master->frameworks.recovered` containing
-              // all FrameworkInfos.
-              // Until then there are 3 cases:
-              // - No authorization enabled: show all orphaned tasks.
-              // - Authorization enabled, but no FrameworkInfo present:
-              //   do not show orphaned tasks.
-              // - Authorization enabled, FrameworkInfo present: filter
-              //   based on `approveViewTask`.
-              if (master->authorizer.isSome() &&
-                 (!master->frameworks.recovered.contains(frameworkId) ||
-                  !approveViewTask(
-                      tasksApprover,
-                      *task,
-                      master->frameworks.recovered[frameworkId]))) {
-                continue;
-              }
+      getTasks.add_tasks()->CopyFrom(*task);
+    }
 
-              getTasks.add_orphan_tasks()->CopyFrom(*task);
+    // Completed tasks.
+    foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
+      // Skip unauthorized tasks.
+      if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+        continue;
+      }
+
+      getTasks.add_completed_tasks()->CopyFrom(*task);
+    }
+
+    // Orphan tasks.
+    foreachvalue (const Slave* slave, master->slaves.registered) {
+      typedef hashmap<TaskID, Task*> TaskMap;
+      foreachvalue (const TaskMap& tasks, slave->tasks) {
+        foreachvalue (const Task* task, tasks) {
+          CHECK_NOTNULL(task);
+          const FrameworkID& frameworkId = task->framework_id();
+          if (!master->frameworks.registered.contains(frameworkId)) {
+            // TODO(joerg84): This logic should be simplified after
+            // a deprecation cycle starting with 1.0 as after that
+            // we can rely on `master->frameworks.recovered` containing
+            // all FrameworkInfos.
+            // Until then there are 3 cases:
+            // - No authorization enabled: show all orphaned tasks.
+            // - Authorization enabled, but no FrameworkInfo present:
+            //   do not show orphaned tasks.
+            // - Authorization enabled, FrameworkInfo present: filter
+            //   based on `approveViewTask`.
+            if (master->authorizer.isSome() &&
+               (!master->frameworks.recovered.contains(frameworkId) ||
+                !approveViewTask(
+                    tasksApprover,
+                    *task,
+                    master->frameworks.recovered[frameworkId]))) {
+              continue;
             }
+
+            getTasks.add_orphan_tasks()->CopyFrom(*task);
           }
         }
       }
-      return getTasks;
-  }));
+    }
+  }
+
+  return getTasks;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b16dbb2c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0688ba3..60efd22 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1388,8 +1388,7 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
-    process::Future<mesos::master::Response::GetAgents> _getAgents(
-        const Option<std::string>& principal) const;
+    mesos::master::Response::GetAgents _getAgents() const;
 
     process::Future<process::http::Response> getFlags(
         const mesos::master::Call& call,
@@ -1461,8 +1460,9 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
-    process::Future<mesos::master::Response::GetTasks> _getTasks(
-        const Option<std::string>& principal) const;
+    mesos::master::Response::GetTasks _getTasks(
+        const process::Owned<ObjectApprover>& frameworksApprover,
+        const process::Owned<ObjectApprover>& tasksApprover) const;
 
     process::Future<process::http::Response> createVolumes(
         const mesos::master::Call& call,
@@ -1489,24 +1489,32 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
-    process::Future<mesos::master::Response::GetFrameworks> _getFrameworks(
-        const Option<std::string>& principal) const;
+    mesos::master::Response::GetFrameworks _getFrameworks(
+        const process::Owned<ObjectApprover>& frameworksApprover) const;
 
     process::Future<process::http::Response> getExecutors(
         const mesos::master::Call& call,
         const Option<std::string>& principal,
         ContentType contentType) const;
 
-    process::Future<mesos::master::Response::GetExecutors> _getExecutors(
-        const Option<std::string>& principal) const;
+    mesos::master::Response::GetExecutors _getExecutors(
+        const process::Owned<ObjectApprover>& frameworksApprover,
+        const process::Owned<ObjectApprover>& executorsApprover) const;
 
     process::Future<process::http::Response> getState(
         const mesos::master::Call& call,
         const Option<std::string>& principal,
         ContentType contentType) const;
 
-    process::Future<mesos::master::Response::GetState> _getState(
-        const Option<std::string>& principal) const;
+    mesos::master::Response::GetState _getState(
+        const process::Owned<ObjectApprover>& frameworksApprover,
+        const process::Owned<ObjectApprover>& taskApprover,
+        const process::Owned<ObjectApprover>& executorsApprover) const;
+
+    process::Future<process::http::Response> subscribe(
+        const mesos::master::Call& call,
+        const Option<std::string>& principal,
+        ContentType contentType) const;
 
     Master* master;