You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/01/22 23:50:17 UTC

mesos git commit: Fixed dropped events on the master operator API stream.

Repository: mesos
Updated Branches:
  refs/heads/master 336e93219 -> cf3311847


Fixed dropped events on the master operator API stream.

The master's `Subscribers::send()` call path previously accessed
event-related master state after an asynchronous call. Thus, if
that state changed after the call to `Subscribers::send()` but
before the event was actually sent, messages could be dropped.

This issue was observed when a TASK_KILLED update failed to be
sent on an operator's stream because the task was removed from
master state in between the aforementioned async calls.

This patch updates that call path to capture a shared copy of
event-related metadata so that the asynchronous calls have a
consistent view of the master's state at the time of the event.

Review: https://reviews.apache.org/r/65253/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf331184
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf331184
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf331184

Branch: refs/heads/master
Commit: cf331184714f692f21988a53fd04fa64fbbb3aba
Parents: 336e932
Author: Greg Mann <gr...@mesosphere.io>
Authored: Mon Jan 22 15:46:34 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Jan 22 15:46:34 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 144 +++++++++++++++++++++++++++++----------------
 src/master/master.hpp |  28 ++++++---
 2 files changed, 114 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cf331184/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3af96b1..7b6b5c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -319,6 +319,7 @@ Master::Master(
     detector(_detector),
     authorizer(_authorizer),
     frameworks(flags),
+    subscribers(this),
     authenticator(None()),
     metrics(new Metrics(*this)),
     electedTime(None())
@@ -11156,38 +11157,84 @@ static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo)
 }
 
 
-void Master::Subscribers::send(const mesos::master::Event& event)
+void Master::Subscribers::send(mesos::master::Event&& event)
 {
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
+  Option<Shared<FrameworkInfo>> frameworkInfo;
+  Option<Shared<Task>> task;
+
+  // Copy metadata associated with the event if necessary, so that we capture
+  // the current state before we make asynchronous calls below.
+  switch (event.type()) {
+    case mesos::master::Event::TASK_ADDED: {
+      Framework* framework =
+        master->getFramework(event.task_added().task().framework_id());
+
+      CHECK_NOTNULL(framework);
+
+      frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
+      break;
+    }
+    case mesos::master::Event::TASK_UPDATED: {
+      Framework* framework =
+        master->getFramework(event.task_updated().framework_id());
+
+      CHECK_NOTNULL(framework);
+
+      frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
+
+      Task* storedTask =
+        framework->getTask(event.task_updated().status().task_id());
+
+      CHECK_NOTNULL(storedTask);
+
+      task = Shared<Task>(new Task(*storedTask));
+      break;
+    }
+    case mesos::master::Event::FRAMEWORK_ADDED:
+    case mesos::master::Event::FRAMEWORK_UPDATED:
+    case mesos::master::Event::FRAMEWORK_REMOVED:
+    case mesos::master::Event::AGENT_ADDED:
+    case mesos::master::Event::AGENT_REMOVED:
+    case mesos::master::Event::SUBSCRIBED:
+    case mesos::master::Event::HEARTBEAT:
+    case mesos::master::Event::UNKNOWN:
+      break;
+  }
+
+  // Create a single copy of the event for all subscribers to share.
+  Shared<mesos::master::Event> sharedEvent(
+      new mesos::master::Event(std::move(event)));
+
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
     Future<Owned<AuthorizationAcceptor>> authorizeRole =
       AuthorizationAcceptor::create(
           subscriber->principal,
-          subscriber->master->authorizer,
+          master->authorizer,
           authorization::VIEW_ROLE);
 
     Future<Owned<AuthorizationAcceptor>> authorizeFramework =
       AuthorizationAcceptor::create(
           subscriber->principal,
-          subscriber->master->authorizer,
+          master->authorizer,
           authorization::VIEW_FRAMEWORK);
 
     Future<Owned<AuthorizationAcceptor>> authorizeTask =
       AuthorizationAcceptor::create(
           subscriber->principal,
-          subscriber->master->authorizer,
+          master->authorizer,
           authorization::VIEW_TASK);
 
     Future<Owned<AuthorizationAcceptor>> authorizeExecutor =
       AuthorizationAcceptor::create(
           subscriber->principal,
-          subscriber->master->authorizer,
+          master->authorizer,
           authorization::VIEW_EXECUTOR);
 
     collect(authorizeRole, authorizeFramework, authorizeTask, authorizeExecutor)
-      .then(defer(subscriber->master->self(),
+      .then(defer(master->self(),
           [=](const tuple<Owned<AuthorizationAcceptor>,
                           Owned<AuthorizationAcceptor>,
                           Owned<AuthorizationAcceptor>,
@@ -11202,11 +11249,14 @@ void Master::Subscribers::send(const mesos::master::Event& event)
             authorizeTask,
             authorizeExecutor) = acceptors;
 
-        subscriber->send(event,
+        subscriber->send(
+            sharedEvent,
             authorizeRole,
             authorizeFramework,
             authorizeTask,
-            authorizeExecutor);
+            authorizeExecutor,
+            frameworkInfo,
+            task);
 
         return Nothing();
       }));
@@ -11215,52 +11265,43 @@ void Master::Subscribers::send(const mesos::master::Event& event)
 
 
 void Master::Subscribers::Subscriber::send(
-    const mesos::master::Event& event,
+    const Shared<mesos::master::Event>& event,
     const Owned<AuthorizationAcceptor>& authorizeRole,
     const Owned<AuthorizationAcceptor>& authorizeFramework,
     const Owned<AuthorizationAcceptor>& authorizeTask,
-    const Owned<AuthorizationAcceptor>& authorizeExecutor)
+    const Owned<AuthorizationAcceptor>& authorizeExecutor,
+    const Option<Shared<FrameworkInfo>>& frameworkInfo,
+    const Option<Shared<Task>>& task)
 {
-  switch (event.type()) {
+  switch (event->type()) {
     case mesos::master::Event::TASK_ADDED: {
-      Framework* framework =
-        master->getFramework(event.task_added().task().framework_id());
-
-      if (framework == nullptr) {
-        break;
-      }
+      CHECK_SOME(frameworkInfo);
+      CHECK_NOTNULL(&frameworkInfo.get());
 
-      if (authorizeTask->accept(event.task_added().task(), framework->info) &&
-          authorizeFramework->accept(framework->info)) {
-        http.send<mesos::master::Event, v1::master::Event>(event);
+      if (authorizeTask->accept(
+              event->task_added().task(), *frameworkInfo.get()) &&
+          authorizeFramework->accept(*frameworkInfo.get())) {
+        http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
     }
     case mesos::master::Event::TASK_UPDATED: {
-      Framework* framework =
-        master->getFramework(event.task_updated().framework_id());
-
-      if (framework == nullptr) {
-        break;
-      }
-
-      Task* task =
-        framework->getTask(event.task_updated().status().task_id());
+      CHECK_SOME(frameworkInfo);
+      CHECK_NOTNULL(&frameworkInfo.get());
 
-      if (task == nullptr) {
-        break;
-      }
+      CHECK_SOME(task);
+      CHECK_NOTNULL(&task.get());
 
-      if (authorizeTask->accept(*task, framework->info) &&
-          authorizeFramework->accept(framework->info)) {
-        http.send<mesos::master::Event, v1::master::Event>(event);
+      if (authorizeTask->accept(*task.get(), *frameworkInfo.get()) &&
+          authorizeFramework->accept(*frameworkInfo.get())) {
+        http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
     }
     case mesos::master::Event::FRAMEWORK_ADDED: {
       if (authorizeFramework->accept(
-              event.framework_added().framework().framework_info())) {
-        mesos::master::Event event_(event);
+              event->framework_added().framework().framework_info())) {
+        mesos::master::Event event_(*event);
         event_.mutable_framework_added()->mutable_framework()->
           mutable_allocated_resources()->Clear();
         event_.mutable_framework_added()->mutable_framework()->
@@ -11268,7 +11309,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event.framework_added().framework().allocated_resources()) {
+            event->framework_added().framework().allocated_resources()) {
           if (authorizeResource(resource, authorizeRole)) {
             event_.mutable_framework_added()->mutable_framework()->
               add_allocated_resources()->CopyFrom(resource);
@@ -11277,7 +11318,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event.framework_added().framework().offered_resources()) {
+            event->framework_added().framework().offered_resources()) {
           if (authorizeResource(resource, authorizeRole)) {
             event_.mutable_framework_added()->mutable_framework()->
               add_offered_resources()->CopyFrom(resource);
@@ -11290,8 +11331,8 @@ void Master::Subscribers::Subscriber::send(
     }
     case mesos::master::Event::FRAMEWORK_UPDATED: {
       if (authorizeFramework->accept(
-              event.framework_updated().framework().framework_info())) {
-        mesos::master::Event event_(event);
+              event->framework_updated().framework().framework_info())) {
+        mesos::master::Event event_(*event);
         event_.mutable_framework_updated()->mutable_framework()->
           mutable_allocated_resources()->Clear();
         event_.mutable_framework_updated()->mutable_framework()->
@@ -11299,7 +11340,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event.framework_updated().framework().allocated_resources()) {
+            event->framework_updated().framework().allocated_resources()) {
           if (authorizeResource(resource, authorizeRole)) {
             event_.mutable_framework_updated()->mutable_framework()->
               add_allocated_resources()->CopyFrom(resource);
@@ -11308,7 +11349,7 @@ void Master::Subscribers::Subscriber::send(
 
         foreach(
             const Resource& resource,
-            event.framework_updated().framework().offered_resources()) {
+            event->framework_updated().framework().offered_resources()) {
           if (authorizeResource(resource, authorizeRole)) {
             event_.mutable_framework_updated()->mutable_framework()->
               add_offered_resources()->CopyFrom(resource);
@@ -11321,19 +11362,19 @@ void Master::Subscribers::Subscriber::send(
     }
     case mesos::master::Event::FRAMEWORK_REMOVED: {
       if (authorizeFramework->accept(
-              event.framework_removed().framework_info())) {
-        http.send<mesos::master::Event, v1::master::Event>(event);
+              event->framework_removed().framework_info())) {
+        http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
     }
     case mesos::master::Event::AGENT_ADDED: {
-      mesos::master::Event event_(event);
+      mesos::master::Event event_(*event);
       event_.mutable_agent_added()->mutable_agent()->
         mutable_total_resources()->Clear();
 
       foreach(
           const Resource& resource,
-          event.agent_added().agent().total_resources()) {
+          event->agent_added().agent().total_resources()) {
         if (authorizeResource(resource, authorizeRole)) {
           event_.mutable_agent_added()->mutable_agent()->add_total_resources()
             ->CopyFrom(resource);
@@ -11343,8 +11384,11 @@ void Master::Subscribers::Subscriber::send(
       http.send<mesos::master::Event, v1::master::Event>(event_);
       break;
     }
-    default:
-      http.send<mesos::master::Event, v1::master::Event>(event);
+    case mesos::master::Event::AGENT_REMOVED:
+    case mesos::master::Event::SUBSCRIBED:
+    case mesos::master::Event::HEARTBEAT:
+    case mesos::master::Event::UNKNOWN:
+      http.send<mesos::master::Event, v1::master::Event>(*event);
       break;
   }
 }
@@ -11380,7 +11424,7 @@ void Master::subscribe(
   subscribers.subscribed.put(
       http.streamId,
       Owned<Subscribers::Subscriber>(
-          new Subscribers::Subscriber{this, http, principal}));
+          new Subscribers::Subscriber{http, principal}));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf331184/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0513678..1fa428d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1961,6 +1961,8 @@ private:
 
   struct Subscribers
   {
+    Subscribers(Master* _master) : master(_master) {};
+
     // Represents a client subscribed to the 'api/vX' endpoint.
     //
     // TODO(anand): Add support for filtering. Some subscribers
@@ -1968,11 +1970,9 @@ private:
     struct Subscriber
     {
       Subscriber(
-          Master* _master,
           const HttpConnection& _http,
           const Option<process::http::authentication::Principal> _principal)
-        : master(_master),
-          http(_http),
+        : http(_http),
           principal(_principal)
       {
         mesos::master::Event event;
@@ -1994,11 +1994,20 @@ private:
       Subscriber(const Subscriber&) = delete;
       Subscriber& operator=(const Subscriber&) = delete;
 
-      void send(const mesos::master::Event& event,
+      // The `AuthorizationAcceptor` parameters here are not all required for
+      // every event, but we currently construct and pass them all regardless
+      // of the event type.
+      //
+      // TODO(greggomann): Refactor this function into multiple event-specific
+      // overloads. See MESOS-8475.
+      void send(
+          const process::Shared<mesos::master::Event>& event,
           const process::Owned<AuthorizationAcceptor>& authorizeRole,
           const process::Owned<AuthorizationAcceptor>& authorizeFramework,
           const process::Owned<AuthorizationAcceptor>& authorizeTask,
-          const process::Owned<AuthorizationAcceptor>& authorizeExecutor);
+          const process::Owned<AuthorizationAcceptor>& authorizeExecutor,
+          const Option<process::Shared<FrameworkInfo>>& frameworkInfo,
+          const Option<process::Shared<Task>>& task);
 
       ~Subscriber()
       {
@@ -2012,7 +2021,6 @@ private:
         wait(heartbeater.get());
       }
 
-      Master* master;
       HttpConnection http;
       process::Owned<Heartbeater<mesos::master::Event, v1::master::Event>>
         heartbeater;
@@ -2020,12 +2028,16 @@ private:
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
-    void send(const mesos::master::Event& event);
+    void send(mesos::master::Event&& event);
+
+    Master* master;
 
     // Active subscribers to the 'api/vX' endpoint keyed by the stream
     // identifier.
     hashmap<id::UUID, process::Owned<Subscriber>> subscribed;
-  } subscribers;
+  };
+
+  Subscribers subscribers;
 
   hashmap<OfferID, Offer*> offers;
   hashmap<OfferID, process::Timer> offerTimers;