You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/03/07 01:26:11 UTC

[5/7] mesos git commit: Updated the master to handle frameworks that changes its roles.

Updated the master to handle frameworks that changes its roles.

Review: https://reviews.apache.org/r/57110


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68bb8d1f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68bb8d1f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68bb8d1f

Branch: refs/heads/master
Commit: 68bb8d1f0eb13e910906fde2879900ac433b3500
Parents: 98c7722
Author: Michael Park <mp...@apache.org>
Authored: Sun Mar 5 19:01:29 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Mon Mar 6 16:06:15 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 168 +++++++++++++++---------------
 src/master/master.hpp | 252 ++++++++++++++++++++++++++++++---------------
 2 files changed, 256 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d6d954e..dd1e4cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -382,6 +382,56 @@ struct BoundedRateLimiter
 };
 
 
+bool Framework::isTrackedUnderRole(const std::string& role) const
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  return master->roles.contains(role) &&
+         master->roles.at(role)->frameworks.contains(id());
+}
+
+void Framework::trackUnderRole(const std::string& role)
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  CHECK(!isTrackedUnderRole(role));
+
+  CHECK(roles.count(role) > 0);
+
+  if (!master->roles.contains(role)) {
+    master->roles[role] = new Role(role);
+  }
+  master->roles.at(role)->addFramework(this);
+}
+
+void Framework::untrackUnderRole(const std::string& role)
+{
+  CHECK(master->isWhitelistedRole(role))
+    << "Unknown role '" << role << "'" << " of framework " << *this;
+
+  CHECK(isTrackedUnderRole(role));
+
+  // NOTE: Ideally we would also `CHECK` that we're not currently subscribed
+  // to the role. We don't do this currently because this function is used in
+  // `Master::removeFramework` where we're still subscribed to `roles`.
+
+  auto allocatedToRole = [&role](const Resource& resource) {
+    return resource.allocation_info().role() == role;
+  };
+
+  CHECK(totalUsedResources.filter(allocatedToRole).empty());
+  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+
+  master->roles.at(role)->removeFramework(this);
+  if (master->roles.at(role)->frameworks.empty()) {
+    delete master->roles.at(role);
+    master->roles.erase(role);
+  }
+}
+
+
 void Master::initialize()
 {
   LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
@@ -2641,24 +2691,8 @@ void Master::_subscribe(
   if (!framework->recovered()) {
     // The framework has previously been registered with this master;
     // it may or may not currently be connected.
-    LOG(INFO) << "Updating info for framework " << framework->id();
-
-    Try<Nothing> updateFrameworkInfo =
-      framework->updateFrameworkInfo(frameworkInfo);
-
-    if (updateFrameworkInfo.isError()) {
-      LOG(INFO) << "Could not update FrameworkInfo of framework '"
-                << frameworkInfo.name() << "': " << updateFrameworkInfo.error();
-
-      FrameworkErrorMessage message;
-      message.set_message(updateFrameworkInfo.error());
-      http.send(message);
-      http.close();
-      return;
-    }
-
-    allocator->updateFramework(framework->id(), framework->info);
 
+    updateFramework(framework, frameworkInfo);
     framework->reregisteredTime = Clock::now();
 
     // Always failover the old framework connection. See MESOS-4712 for details.
@@ -2951,22 +2985,7 @@ void Master::_subscribe(
     // It is now safe to update the framework fields since the request is now
     // guaranteed to be successful. We use the fields passed in during
     // re-registration.
-    LOG(INFO) << "Updating info for framework " << framework->id();
-
-    Try<Nothing> updateFrameworkInfo =
-      framework->updateFrameworkInfo(frameworkInfo);
-
-    if (updateFrameworkInfo.isError()) {
-      LOG(INFO) << "Could not update frameworkInfo of framework '" << *framework
-                << "': " << updateFrameworkInfo.error();
-
-      FrameworkErrorMessage message;
-      message.set_message(updateFrameworkInfo.error());
-      send(from, message);
-      return;
-    }
-
-    allocator->updateFramework(framework->id(), framework->info);
+    updateFramework(framework, frameworkInfo);
 
     framework->reregisteredTime = Clock::now();
 
@@ -6036,6 +6055,36 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 }
 
 
+void Master::updateFramework(
+    Framework* framework,
+    const FrameworkInfo& frameworkInfo)
+{
+  LOG(INFO) << "Updating info for framework " << framework->id();
+
+  // NOTE: The allocator takes care of activating/deactivating
+  // the frameworks from the added/removed roles, respectively.
+  allocator->updateFramework(framework->id(), frameworkInfo);
+
+  // First, remove the offers allocated to roles being removed.
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
+    if (newRoles.count(offer->allocation_info().role()) > 0) {
+      continue;
+    }
+
+    allocator->recoverResources(
+        offer->framework_id(),
+        offer->slave_id(),
+        offer->resources(),
+        None());
+
+    removeOffer(offer, true); // Rescind!
+  }
+
+  framework->update(frameworkInfo);
+}
+
+
 void Master::updateSlave(
     const SlaveID& slaveId,
     const Resources& oversubscribedResources)
@@ -7458,25 +7507,6 @@ void Master::addFramework(Framework* framework)
     }
   }
 
-  auto addFrameworkRole = [this](Framework* framework, const string& role) {
-    CHECK(isWhitelistedRole(role))
-      << "Unknown role '" << role << "'"
-      << " of framework " << *framework;
-
-    if (!roles.contains(role)) {
-      roles[role] = new Role(role);
-    }
-    roles.at(role)->addFramework(framework);
-  };
-
-  if (framework->capabilities.multiRole) {
-    foreach (const string& role, framework->info.roles()) {
-      addFrameworkRole(framework, role);
-    }
-  } else {
-    addFrameworkRole(framework, framework->info.role());
-  }
-
   // There should be no offered resources yet!
   CHECK_EQ(Resources(), framework->totalOfferedResources);
 
@@ -7552,17 +7582,7 @@ Try<Nothing> Master::activateRecoveredFramework(
   CHECK(framework->pid.isNone());
   CHECK(framework->http.isNone());
 
-  // The `FrameworkInfo` might have changed.
-  LOG(INFO) << "Updating info for framework " << framework->id();
-
-  Try<Nothing> updateFrameworkInfo =
-    framework->updateFrameworkInfo(frameworkInfo);
-
-  if (updateFrameworkInfo.isError()) {
-    return updateFrameworkInfo;
-  }
-
-  allocator->updateFramework(framework->id(), framework->info);
+  updateFramework(framework, frameworkInfo);
 
   // Updating `registeredTime` here is debatable: ideally,
   // `registeredTime` would be the time at which the framework first
@@ -7897,26 +7917,8 @@ void Master::removeFramework(Framework* framework)
 
   framework->unregisteredTime = Clock::now();
 
-  auto removeFrameworkRole = [this](Framework* framework, const string& role) {
-    CHECK(isWhitelistedRole(role))
-      << "Unknown role '" << role << "'"
-      << " of framework " << *framework;
-
-    CHECK(roles.contains(role));
-
-    roles[role]->removeFramework(framework);
-    if (roles[role]->frameworks.empty()) {
-      delete roles[role];
-      roles.erase(role);
-    }
-  };
-
-  if (framework->capabilities.multiRole) {
-    foreach (const string& role, framework->info.roles()) {
-      removeFrameworkRole(framework, role);
-    }
-  } else {
-    removeFrameworkRole(framework, framework->info.role());
+  foreach (const string& role, framework->roles) {
+    framework->untrackUnderRole(role);
   }
 
   // TODO(anand): This only works for pid based frameworks. We would

http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1738eeb..d92c8ad 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -602,6 +602,10 @@ protected:
   // executors and recover the resources.
   void removeFramework(Slave* slave, Framework* framework);
 
+  void updateFramework(
+      Framework* framework,
+      const FrameworkInfo& frameworkInfo);
+
   void disconnect(Framework* framework);
   void deactivate(Framework* framework, bool rescind);
 
@@ -1820,7 +1824,10 @@ private:
   hashmap<OfferID, InverseOffer*> inverseOffers;
   hashmap<OfferID, process::Timer> inverseOfferTimers;
 
-  // Roles with > 0 frameworks currently registered.
+  // We track information about roles that we're aware of in the system.
+  // Specifically, we keep track of the roles when a framework subscribes to
+  // the role, and/or when there are resources allocated to the role
+  // (e.g. some tasks and/or executors are consuming resources under the role).
   hashmap<std::string, Role*> roles;
 
   // Configured role whitelist if using the (deprecated) "explicit
@@ -2210,48 +2217,30 @@ struct Framework
     ACTIVE
   };
 
-  Framework(Master* const _master,
+  Framework(Master* const master,
             const Flags& masterFlags,
-            const FrameworkInfo& _info,
+            const FrameworkInfo& info,
             const process::UPID& _pid,
             const process::Time& time = process::Clock::now())
-    : master(_master),
-      info(_info),
-      roles(protobuf::framework::getRoles(_info)),
-      capabilities(_info.capabilities()),
-      pid(_pid),
-      state(ACTIVE),
-      registeredTime(time),
-      reregisteredTime(time),
-      completedTasks(masterFlags.max_completed_tasks_per_framework),
-      unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+    : Framework(master, masterFlags, info, ACTIVE, time)
+  {
+    pid = _pid;
+  }
 
-  Framework(Master* const _master,
+  Framework(Master* const master,
             const Flags& masterFlags,
-            const FrameworkInfo& _info,
+            const FrameworkInfo& info,
             const HttpConnection& _http,
             const process::Time& time = process::Clock::now())
-    : master(_master),
-      info(_info),
-      roles(protobuf::framework::getRoles(_info)),
-      capabilities(_info.capabilities()),
-      http(_http),
-      state(ACTIVE),
-      registeredTime(time),
-      reregisteredTime(time),
-      completedTasks(masterFlags.max_completed_tasks_per_framework),
-      unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+    : Framework(master, masterFlags, info, ACTIVE, time)
+  {
+    http = _http;
+  }
 
-  Framework(Master* const _master,
+  Framework(Master* const master,
             const Flags& masterFlags,
-            const FrameworkInfo& _info)
-    : master(_master),
-      info(_info),
-      roles(protobuf::framework::getRoles(_info)),
-      capabilities(_info.capabilities()),
-      state(RECOVERED),
-      completedTasks(masterFlags.max_completed_tasks_per_framework),
-      unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+            const FrameworkInfo& info)
+    : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
 
   ~Framework()
   {
@@ -2286,6 +2275,18 @@ struct Framework
     if (!Master::isRemovable(task->state())) {
       totalUsedResources += task->resources();
       usedResources[task->slave_id()] += task->resources();
+
+      // It's possible that we're not tracking the task's role for
+      // this framework if the role is absent from the framework's
+      // set of roles. In this case, we track the role's allocation
+      // for this framework.
+      CHECK(!task->resources().empty());
+      const std::string& role =
+        task->resources().begin()->allocation_info().role();
+
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
     }
   }
 
@@ -2297,7 +2298,6 @@ struct Framework
   // functionally for all tasks is expensive, for now.
   void recoverResources(Task* task)
   {
-    CHECK(Master::isRemovable(task->state()));
     CHECK(tasks.contains(task->task_id()))
       << "Unknown task " << task->task_id()
       << " of framework " << task->framework_id();
@@ -2307,6 +2307,23 @@ struct Framework
     if (usedResources[task->slave_id()].empty()) {
       usedResources.erase(task->slave_id());
     }
+
+    // If we are no longer subscribed to the role to which these resources are
+    // being returned to, and we have no more resources allocated to us for that
+    // role, stop tracking the framework under the role.
+    CHECK(!task->resources().empty());
+    const std::string& role =
+      task->resources().begin()->allocation_info().role();
+
+    auto allocatedToRole = [&role](const Resource& resource) {
+      return resource.allocation_info().role() == role;
+    };
+
+    if (roles.count(role) == 0 &&
+        totalUsedResources.filter(allocatedToRole).empty()) {
+      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+      untrackUnderRole(role);
+    }
   }
 
   // Sends a message to the connected framework.
@@ -2355,11 +2372,7 @@ struct Framework
       << " of framework " << task->framework_id();
 
     if (!Master::isRemovable(task->state())) {
-      totalUsedResources -= task->resources();
-      usedResources[task->slave_id()] -= task->resources();
-      if (usedResources[task->slave_id()].empty()) {
-        usedResources.erase(task->slave_id());
-      }
+      recoverResources(task);
     }
 
     if (task->state() == TASK_UNREACHABLE) {
@@ -2431,6 +2444,19 @@ struct Framework
     executors[slaveId][executorInfo.executor_id()] = executorInfo;
     totalUsedResources += executorInfo.resources();
     usedResources[slaveId] += executorInfo.resources();
+
+    // It's possible that we're not tracking the task's role for
+    // this framework if the role is absent from the framework's
+    // set of roles. In this case, we track the role's allocation
+    // for this framework.
+    if (!executorInfo.resources().empty()) {
+      const std::string& role =
+        executorInfo.resources().begin()->allocation_info().role();
+
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
+    }
   }
 
   void removeExecutor(const SlaveID& slaveId,
@@ -2441,12 +2467,32 @@ struct Framework
       << "' of framework " << id()
       << " of agent " << slaveId;
 
-    totalUsedResources -= executors[slaveId][executorId].resources();
-    usedResources[slaveId] -= executors[slaveId][executorId].resources();
+    const ExecutorInfo& executorInfo = executors[slaveId][executorId];
+
+    totalUsedResources -= executorInfo.resources();
+    usedResources[slaveId] -= executorInfo.resources();
     if (usedResources[slaveId].empty()) {
       usedResources.erase(slaveId);
     }
 
+    // If we are no longer subscribed to the role to which these resources are
+    // being returned to, and we have no more resources allocated to us for that
+    // role, stop tracking the framework under the role.
+    if (!executorInfo.resources().empty()) {
+      const std::string& role =
+        executorInfo.resources().begin()->allocation_info().role();
+
+      auto allocatedToRole = [&role](const Resource& resource) {
+        return resource.allocation_info().role() == role;
+      };
+
+      if (roles.count(role) == 0 &&
+          totalUsedResources.filter(allocatedToRole).empty()) {
+        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+        untrackUnderRole(role);
+      }
+    }
+
     executors[slaveId].erase(executorId);
     if (executors[slaveId].empty()) {
       executors.erase(slaveId);
@@ -2456,55 +2502,32 @@ struct Framework
   const FrameworkID id() const { return info.id(); }
 
   // Update fields in 'info' using those in 'newInfo'. Currently this
-  // only updates 'name', 'failover_timeout', 'hostname', 'webui_url',
-  // 'capabilities', and 'labels'.
-  Try<Nothing> updateFrameworkInfo(const FrameworkInfo& newInfo)
+  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
+  // 'webui_url', 'capabilities', and 'labels'.
+  void update(const FrameworkInfo& newInfo)
   {
     // We only merge 'info' from the same framework 'id'.
     CHECK_EQ(info.id(), newInfo.id());
 
+    // Save the old list of roles for later.
+    std::set<std::string> oldRoles = roles;
+
     // TODO(jmlvanre): Merge other fields as per design doc in
     // MESOS-703.
 
-    // We currently do not allow frameworks to add or remove roles. We
-    // do however allow frameworks to opt in and out of `MULTI_ROLE`
-    // capability, given that the `role` and `roles` field contain the
-    // same number of roles.
-    if (capabilities.multiRole || protobuf::frameworkHasCapability(
-            newInfo, FrameworkInfo::Capability::MULTI_ROLE)) {
-      // Two `roles` sets are equivalent if they contain the same
-      // elements. A `role` `*` is not equivalent to an empty `roles`
-      // set, but to the set `{*}`. Since we might be dealing with a
-      // framework upgrading to `MULTI_ROLE` capability or dropping
-      // it, we need to examine either `role` or `roles` in order to
-      // determine the roles a framework is subscribed to.
-      const std::set<std::string> newRoles =
-        protobuf::framework::getRoles(newInfo);
-
-      if (roles != newRoles) {
-        return Error(
-            "Frameworks cannot change their roles: expected '" +
-            stringify(roles) + "', but got '" + stringify(newRoles) + "'");
-      }
-
-      info.clear_role();
-      info.clear_roles();
+    info.clear_role();
+    info.clear_roles();
 
-      if (newInfo.has_role()) {
-        info.set_role(newInfo.role());
-      }
+    if (newInfo.has_role()) {
+      info.set_role(newInfo.role());
+    }
 
-      if (newInfo.roles_size() > 0) {
-        info.mutable_roles()->CopyFrom(newInfo.roles());
-      }
-    } else {
-      if (newInfo.role() != info.role()) {
-        LOG(WARNING) << "Cannot update FrameworkInfo.role to '"
-                     << newInfo.role() << "' for framework " << id()
-                     << ". Check MESOS-703";
-      }
+    if (newInfo.roles_size() > 0) {
+      info.mutable_roles()->CopyFrom(newInfo.roles());
     }
 
+    roles = protobuf::framework::getRoles(newInfo);
+
     if (newInfo.user() != info.user()) {
       LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
                    << "' for framework " << id() << ". Check MESOS-703";
@@ -2555,7 +2578,45 @@ struct Framework
       info.clear_labels();
     }
 
-    return Nothing();
+    const std::set<std::string>& newRoles = roles;
+
+    const std::set<std::string> removedRoles = [&]() {
+      std::set<std::string> result = oldRoles;
+      foreach (const std::string& role, newRoles) {
+        result.erase(role);
+      }
+      return result;
+    }();
+
+    foreach (const std::string& role, removedRoles) {
+      auto allocatedToRole = [&role](const Resource& resource) {
+        return resource.allocation_info().role() == role;
+      };
+
+      // Stop tracking the framework under this role if there are
+      // no longer any resources allocated to it.
+      if (totalUsedResources.filter(allocatedToRole).empty()) {
+        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+        untrackUnderRole(role);
+      }
+    }
+
+    const std::set<std::string> addedRoles = [&]() {
+      std::set<std::string> result = newRoles;
+      foreach (const std::string& role, oldRoles) {
+        result.erase(role);
+      }
+      return result;
+    }();
+
+    foreach (const std::string& role, addedRoles) {
+      // NOTE: It's possible that we're already tracking this framework
+      // under the role because a framework can unsubscribe from a role
+      // while it still has resources allocated to the role.
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
+    }
   }
 
   void updateConnection(const process::UPID& newPid)
@@ -2628,6 +2689,10 @@ struct Framework
   bool connected() const { return state == ACTIVE || state == INACTIVE; }
   bool recovered() const { return state == RECOVERED; }
 
+  bool isTrackedUnderRole(const std::string& role) const;
+  void trackUnderRole(const std::string& role);
+  void untrackUnderRole(const std::string& role);
+
   Master* const master;
 
   FrameworkInfo info;
@@ -2719,6 +2784,31 @@ struct Framework
   Option<process::Owned<Heartbeater>> heartbeater;
 
 private:
+  Framework(Master* const _master,
+            const Flags& masterFlags,
+            const FrameworkInfo& _info,
+            State state,
+            const process::Time& time)
+    : master(_master),
+      info(_info),
+      roles(protobuf::framework::getRoles(_info)),
+      capabilities(_info.capabilities()),
+      state(state),
+      registeredTime(time),
+      reregisteredTime(time),
+      completedTasks(masterFlags.max_completed_tasks_per_framework),
+      unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
+  {
+    foreach (const std::string& role, roles) {
+      // NOTE: It's possible that we're already being tracked under the role
+      // because a framework can unsubscribe from a role while it still has
+      // resources allocated to the role.
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
+    }
+  }
+
   Framework(const Framework&);              // No copying.
   Framework& operator=(const Framework&); // No assigning.
 };