You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/05/19 21:22:14 UTC

[1/3] mesos git commit: Index slaves by UPID in the master.

Repository: mesos
Updated Branches:
  refs/heads/master 26091f461 -> c24268f13


Index slaves by UPID in the master.

Review: https://reviews.apache.org/r/34388


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af

Branch: refs/heads/master
Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf
Parents: b19ffd2
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:37:11 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:30 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 127 +++++++++++++++++++++++----------------------
 src/master/master.hpp |  65 ++++++++++++++++++++++-
 2 files changed, 129 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index eaea79d..d2df99c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -973,7 +973,9 @@ void Master::exited(const UPID& pid)
     }
   }
 
-  // The semantics when a slave gets disconnected are as follows:
+  // The semantics when a registered slave gets disconnected are as
+  // follows:
+  //
   // 1) If the slave is not checkpointing, the slave is immediately
   //    removed and all tasks running on it are transitioned to LOST.
   //    No resources are recovered, because the slave is removed.
@@ -985,42 +987,42 @@ void Master::exited(const UPID& pid)
   //    2.2) Framework is not-checkpointing: The slave is not removed
   //         but the framework is removed from the slave's structs,
   //         its tasks transitioned to LOST and resources recovered.
-  foreachvalue (Slave* slave, slaves.registered) {
-    if (slave->pid == pid) {
-      LOG(INFO) << "Slave " << *slave << " disconnected";
-
-      if (!slave->info.checkpoint()) {
-        // Remove the slave, if it is not checkpointing.
-        LOG(INFO) << "Removing disconnected slave " << *slave
-                  << " because it is not checkpointing!";
-        removeSlave(slave,
-                    "slave is non-checkpointing and disconnected");
-        return;
-      } else if (slave->connected) {
-        // Checkpointing slaves can just be disconnected.
-        disconnect(slave);
+  if (slaves.registered.contains(pid)) {
+    Slave* slave = slaves.registered.get(pid);
+    CHECK_NOTNULL(slave);
+
+    LOG(INFO) << "Slave " << *slave << " disconnected";
+
+    if (!slave->info.checkpoint()) {
+      // Remove the slave, if it is not checkpointing.
+      LOG(INFO) << "Removing disconnected slave " << *slave
+                << " because it is not checkpointing!";
+      removeSlave(slave, "slave is non-checkpointing and disconnected");
+      return;
+    } else if (slave->connected) {
+      // Checkpointing slaves can just be disconnected.
+      disconnect(slave);
 
-        // Remove all non-checkpointing frameworks.
-        hashset<FrameworkID> frameworkIds =
+      // Remove all non-checkpointing frameworks.
+      hashset<FrameworkID> frameworkIds =
           slave->tasks.keys() | slave->executors.keys();
 
-        foreach (const FrameworkID& frameworkId, frameworkIds) {
-          Framework* framework = getFramework(frameworkId);
-          if (framework != NULL && !framework->info.checkpoint()) {
-            LOG(INFO) << "Removing framework " << *framework
-                      << " from disconnected slave " << *slave
-                      << " because the framework is not checkpointing";
+      foreach (const FrameworkID& frameworkId, frameworkIds) {
+        Framework* framework = getFramework(frameworkId);
+        if (framework != NULL && !framework->info.checkpoint()) {
+          LOG(INFO) << "Removing framework " << *framework
+                    << " from disconnected slave " << *slave
+                    << " because the framework is not checkpointing";
 
-            removeFramework(slave, framework);
-          }
+          removeFramework(slave, framework);
         }
-      } else {
-        // NOTE: A duplicate exited() event is possible for a slave
-        // because its PID doesn't change on restart. See MESOS-675
-        // for details.
-        LOG(WARNING) << "Ignoring duplicate exited() notification for "
-                     << "checkpointing slave " << *slave;
       }
+    } else {
+      // NOTE: A duplicate exited() event is possible for a slave
+      // because its PID doesn't change on restart. See MESOS-675
+      // for details.
+      LOG(WARNING) << "Ignoring duplicate exited() notification for "
+                   << "checkpointing slave " << *slave;
     }
   }
 }
@@ -3094,31 +3096,30 @@ void Master::registerSlave(
   }
 
   // Check if this slave is already registered (because it retries).
-  foreachvalue (Slave* slave, slaves.registered) {
-    if (slave->pid == from) {
-      if (!slave->connected) {
-        // The slave was previously disconnected but it is now trying
-        // to register as a new slave. This could happen if the slave
-        // failed recovery and hence registering as a new slave before
-        // the master removed the old slave from its map.
-        LOG(INFO)
-          << "Removing old disconnected slave " << *slave
-          << " because a registration attempt is being made from " << from;
-        removeSlave(slave,
-                    "a new slave registered at the same address",
-                    metrics->slave_removals_reason_registered);
-        break;
-      } else {
-        CHECK(slave->active)
-            << "Unexpected connected but deactivated slave " << *slave;
-
-        LOG(INFO) << "Slave " << *slave << " already registered,"
-                  << " resending acknowledgement";
-        SlaveRegisteredMessage message;
-        message.mutable_slave_id()->MergeFrom(slave->id);
-        send(from, message);
-        return;
-      }
+  if (slaves.registered.contains(from)) {
+    Slave* slave = slaves.registered.get(from);
+    CHECK_NOTNULL(slave);
+
+    if (!slave->connected) {
+      // The slave was previously disconnected but it is now trying
+      // to register as a new slave. This could happen if the slave
+      // failed recovery and hence registering as a new slave before
+      // the master removed the old slave from its map.
+      LOG(INFO) << "Removing old disconnected slave " << *slave
+                << " because a registration attempt occurred";
+      removeSlave(slave,
+                  "a new slave registered at the same address",
+                  metrics->slave_removals_reason_registered);
+    } else {
+      CHECK(slave->active)
+        << "Unexpected connected but deactivated slave " << *slave;
+
+      LOG(INFO) << "Slave " << *slave << " already registered,"
+                << " resending acknowledgement";
+      SlaveRegisteredMessage message;
+      message.mutable_slave_id()->MergeFrom(slave->id);
+      send(from, message);
+      return;
     }
   }
 
@@ -3574,7 +3575,8 @@ void Master::exitedExecutor(
     return;
   }
 
-  Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]);
+  Slave* slave = slaves.registered.get(slaveId);
+  CHECK_NOTNULL(slave);
 
   if (!slave->hasExecutor(frameworkId, executorId)) {
     LOG(WARNING) << "Ignoring unknown exited executor '" << executorId
@@ -3624,7 +3626,7 @@ void Master::shutdown(
     return;
   }
 
-  Slave* slave = slaves.registered[shutdown.slave_id()];
+  Slave* slave = slaves.registered.get(shutdown.slave_id());
   CHECK_NOTNULL(slave);
 
   ShutdownExecutorMessage message;
@@ -3643,7 +3645,7 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
     return;
   }
 
-  Slave* slave = slaves.registered[slaveId];
+  Slave* slave = slaves.registered.get(slaveId);
   CHECK_NOTNULL(slave);
 
   LOG(WARNING) << "Shutting down slave " << *slave << " with message '"
@@ -3919,7 +3921,8 @@ void Master::offer(const FrameworkID& frameworkId,
       continue;
     }
 
-    Slave* slave = slaves.registered[slaveId];
+    Slave* slave = slaves.registered.get(slaveId);
+    CHECK_NOTNULL(slave);
 
     CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
         << "Resources of non checkpointing slave " << *slave
@@ -4599,7 +4602,7 @@ void Master::addSlave(
   CHECK_NOTNULL(slave);
 
   slaves.removed.erase(slave->id);
-  slaves.registered[slave->id] = slave;
+  slaves.registered.put(slave);
 
   link(slave->pid);
 
@@ -4734,7 +4737,7 @@ void Master::removeSlave(
 
   // Mark the slave as being removed.
   slaves.removing.insert(slave->id);
-  slaves.registered.erase(slave->id);
+  slaves.registered.remove(slave);
   slaves.removed.put(slave->id, Nothing());
   authenticated.erase(slave->pid);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0922a7c..4a94e23 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1154,7 +1154,70 @@ private:
     // these slaves until the registrar determines their fate.
     hashset<SlaveID> reregistering;
 
-    hashmap<SlaveID, Slave*> registered;
+    // Registered slaves are indexed by SlaveID and UPID. Note that
+    // iteration is supported but is exposed as iteration over a
+    // hashmap<SlaveID, Slave*> since it is tedious to convert
+    // the map's key/value iterator into a value iterator.
+    //
+    // TODO(bmahler): Consider pulling in boost's multi_index,
+    // or creating a simpler indexing abstraction in stout.
+    struct
+    {
+      bool contains(const SlaveID& slaveId) const
+      {
+        return ids.contains(slaveId);
+      }
+
+      bool contains(const process::UPID& pid) const
+      {
+        return pids.contains(pid);
+      }
+
+      Slave* get(const SlaveID& slaveId) const
+      {
+        return ids.get(slaveId).get(NULL);
+      }
+
+      Slave* get(const process::UPID& pid) const
+      {
+        return pids.get(pid).get(NULL);
+      }
+
+      void put(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids[slave->id] = slave;
+        pids[slave->pid] = slave;
+      }
+
+      void remove(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids.erase(slave->id);
+        pids.erase(slave->pid);
+      }
+
+      void clear()
+      {
+        ids.clear();
+        pids.clear();
+      }
+
+      size_t size() const { return ids.size(); }
+
+      typedef hashmap<SlaveID, Slave*>::iterator iterator;
+      typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
+
+      iterator begin() { return ids.begin(); }
+      iterator end()   { return ids.end();   }
+
+      const_iterator begin() const { return ids.begin(); }
+      const_iterator end()   const { return ids.end();   }
+
+    private:
+      hashmap<SlaveID, Slave*> ids;
+      hashmap<process::UPID, Slave*> pids;
+    } registered;
 
     // Slaves that are in the process of being removed from the
     // registrar. Think of these as being partially removed: we must


[2/3] mesos git commit: Moved up Slave and Framework structs in master.hpp.

Posted by bm...@apache.org.
Moved up Slave and Framework structs in master.hpp.

Review: https://reviews.apache.org/r/34387


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b19ffd2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b19ffd2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b19ffd2f

Branch: refs/heads/master
Commit: b19ffd2fe72d50fbd9f62c4c32eaa0906b2bd177
Parents: 26091f4
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:17:59 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:30 2015 -0700

----------------------------------------------------------------------
 src/master/master.hpp | 1951 ++++++++++++++++++++++----------------------
 1 file changed, 974 insertions(+), 977 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b19ffd2f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index da0a835..0922a7c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -89,1228 +89,1225 @@ class Repairer;
 class SlaveObserver;
 
 struct BoundedRateLimiter;
-struct Framework;
-struct Role;
-struct Slave;
 
 
-class Master : public ProtobufProcess<Master>
+struct Slave
 {
-public:
-  Master(mesos::master::allocator::Allocator* allocator,
-         Registrar* registrar,
-         Repairer* repairer,
-         Files* files,
-         MasterContender* contender,
-         MasterDetector* detector,
-         const Option<Authorizer*>& authorizer,
-         const Option<std::shared_ptr<process::RateLimiter>>&
-           slaveRemovalLimiter,
-         const Flags& flags = Flags());
-
-  virtual ~Master();
-
-  // Message handlers.
-  void submitScheduler(
-      const std::string& name);
-
-  void registerFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo);
-
-  void reregisterFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo,
-      bool failover);
-
-  void unregisterFramework(
-      const process::UPID& from,
-      const FrameworkID& frameworkId);
-
-  void deactivateFramework(
-      const process::UPID& from,
-      const FrameworkID& frameworkId);
-
-  // TODO(vinod): Remove this once the old driver is removed.
-  void resourceRequest(
-      const process::UPID& from,
-      const FrameworkID& frameworkId,
-      const std::vector<Request>& requests);
-
-  void launchTasks(
-      const process::UPID& from,
-      const FrameworkID& frameworkId,
-      const std::vector<TaskInfo>& tasks,
-      const Filters& filters,
-      const std::vector<OfferID>& offerIds);
-
-  void reviveOffers(
-      const process::UPID& from,
-      const FrameworkID& frameworkId);
-
-  void killTask(
-      const process::UPID& from,
-      const FrameworkID& frameworkId,
-      const TaskID& taskId);
-
-  void statusUpdateAcknowledgement(
-      const process::UPID& from,
-      const SlaveID& slaveId,
-      const FrameworkID& frameworkId,
-      const TaskID& taskId,
-      const std::string& uuid);
-
-  void schedulerMessage(
-      const process::UPID& from,
-      const SlaveID& slaveId,
-      const FrameworkID& frameworkId,
-      const ExecutorID& executorId,
-      const std::string& data);
-
-  void registerSlave(
-      const process::UPID& from,
-      const SlaveInfo& slaveInfo,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version);
-
-  void reregisterSlave(
-      const process::UPID& from,
-      const SlaveInfo& slaveInfo,
-      const std::vector<Resource>& checkpointedResources,
-      const std::vector<ExecutorInfo>& executorInfos,
-      const std::vector<Task>& tasks,
-      const std::vector<Archive::Framework>& completedFrameworks,
-      const std::string& version);
-
-  void unregisterSlave(
-      const process::UPID& from,
-      const SlaveID& slaveId);
-
-  void statusUpdate(
-      const StatusUpdate& update,
-      const process::UPID& pid);
-
-  void reconcileTasks(
-      const process::UPID& from,
-      const FrameworkID& frameworkId,
-      const std::vector<TaskStatus>& statuses);
-
-  void exitedExecutor(
-      const process::UPID& from,
-      const SlaveID& slaveId,
-      const FrameworkID& frameworkId,
-      const ExecutorID& executorId,
-      int32_t status);
-
-  void shutdownSlave(
-      const SlaveID& slaveId,
-      const std::string& message);
-
-  void authenticate(
-      const process::UPID& from,
-      const process::UPID& pid);
-
-  // TODO(bmahler): It would be preferred to use a unique libprocess
-  // Process identifier (PID is not sufficient) for identifying the
-  // framework instance, rather than relying on re-registration time.
-  void frameworkFailoverTimeout(
-      const FrameworkID& frameworkId,
-      const process::Time& reregisteredTime);
+  Slave(const SlaveInfo& _info,
+        const process::UPID& _pid,
+        const Option<std::string> _version,
+        const process::Time& _registeredTime,
+        const Resources& _checkpointedResources,
+        const std::vector<ExecutorInfo> executorInfos =
+          std::vector<ExecutorInfo>(),
+        const std::vector<Task> tasks =
+          std::vector<Task>())
+    : id(_info.id()),
+      info(_info),
+      pid(_pid),
+      version(_version),
+      registeredTime(_registeredTime),
+      connected(true),
+      active(true),
+      checkpointedResources(_checkpointedResources),
+      observer(NULL)
+  {
+    CHECK(_info.has_id());
 
-  void offer(
-      const FrameworkID& framework,
-      const hashmap<SlaveID, Resources>& resources);
+    Try<Resources> resources = applyCheckpointedResources(
+        info.resources(),
+        _checkpointedResources);
 
-  // Invoked when there is a newly elected leading master.
-  // Made public for testing purposes.
-  void detected(const process::Future<Option<MasterInfo>>& pid);
+    // NOTE: This should be validated during slave recovery.
+    CHECK_SOME(resources);
+    totalResources = resources.get();
 
-  // Invoked when the contender has lost the candidacy.
-  // Made public for testing purposes.
-  void lostCandidacy(const process::Future<Nothing>& lost);
+    foreach (const ExecutorInfo& executorInfo, executorInfos) {
+      CHECK(executorInfo.has_framework_id());
+      addExecutor(executorInfo.framework_id(), executorInfo);
+    }
 
-  // Continuation of recover().
-  // Made public for testing purposes.
-  process::Future<Nothing> _recover(const Registry& registry);
+    foreach (const Task& task, tasks) {
+      addTask(new Task(task));
+    }
+  }
 
-  // Continuation of reregisterSlave().
-  // Made public for testing purposes.
-  // TODO(vinod): Instead of doing this create and use a
-  // MockRegistrar.
-  // TODO(dhamon): Consider FRIEND_TEST macro from gtest.
-  void _reregisterSlave(
-      const SlaveInfo& slaveInfo,
-      const process::UPID& pid,
-      const std::vector<Resource>& checkpointedResources,
-      const std::vector<ExecutorInfo>& executorInfos,
-      const std::vector<Task>& tasks,
-      const std::vector<Archive::Framework>& completedFrameworks,
-      const std::string& version,
-      const process::Future<bool>& readmit);
+  ~Slave() {}
 
-  MasterInfo info() const
+  Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
   {
-    return info_;
+    if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
+      return tasks[frameworkId][taskId];
+    }
+    return NULL;
   }
 
-protected:
-  virtual void initialize();
-  virtual void finalize();
-  virtual void exited(const process::UPID& pid);
-  virtual void visit(const process::MessageEvent& event);
-  virtual void visit(const process::ExitedEvent& event);
-
-  // Invoked when the message is ready to be executed after
-  // being throttled.
-  // 'principal' being None indicates it is throttled by
-  // 'defaultLimiter'.
-  void throttled(
-      const process::MessageEvent& event,
-      const Option<std::string>& principal);
-
-  // Continuations of visit().
-  void _visit(const process::MessageEvent& event);
-  void _visit(const process::ExitedEvent& event);
-
-  // Helper method invoked when the capacity for a framework
-  // principal is exceeded.
-  void exceededCapacity(
-      const process::MessageEvent& event,
-      const Option<std::string>& principal,
-      uint64_t capacity);
-
-  // Recovers state from the registrar.
-  process::Future<Nothing> recover();
-  void recoveredSlavesTimeout(const Registry& registry);
+  void addTask(Task* task)
+  {
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
 
-  void _registerSlave(
-      const SlaveInfo& slaveInfo,
-      const process::UPID& pid,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const process::Future<bool>& admit);
+    CHECK(!tasks[frameworkId].contains(taskId))
+      << "Duplicate task " << taskId << " of framework " << frameworkId;
 
-  void __reregisterSlave(
-      Slave* slave,
-      const std::vector<Task>& tasks);
+    tasks[frameworkId][taskId] = task;
 
-  // 'authenticate' is the future returned by the authenticator.
-  void _authenticate(
-      const process::UPID& pid,
-      const process::Future<Option<std::string>>& authenticate);
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources[frameworkId] += task->resources();
+    }
 
-  void authenticationTimeout(process::Future<Option<std::string>> future);
+    LOG(INFO) << "Adding task " << taskId
+              << " with resources " << task->resources()
+              << " on slave " << id << " (" << info.hostname() << ")";
+  }
 
-  void fileAttached(const process::Future<Nothing>& result,
-                    const std::string& path);
+  // Notification of task termination, for resource accounting.
+  // TODO(bmahler): This is a hack for performance. We need to
+  // maintain resource counters because computing task resources
+  // functionally for all tasks is expensive, for now.
+  void taskTerminated(Task* task)
+  {
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
 
-  // Invoked when the contender has entered the contest.
-  void contended(const process::Future<process::Future<Nothing>>& candidacy);
+    CHECK(protobuf::isTerminalState(task->state()));
+    CHECK(tasks[frameworkId].contains(taskId))
+      << "Unknown task " << taskId << " of framework " << frameworkId;
 
-  // Task reconciliation, split from the message handler
-  // to allow re-use.
-  void _reconcileTasks(
-      Framework* framework,
-      const std::vector<TaskStatus>& statuses);
+    usedResources[frameworkId] -= task->resources();
+    if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+      usedResources.erase(frameworkId);
+    }
+  }
 
-  // Handles a known re-registering slave by reconciling the master's
-  // view of the slave's tasks and executors.
-  void reconcile(
-      Slave* slave,
-      const std::vector<ExecutorInfo>& executors,
-      const std::vector<Task>& tasks);
+  void removeTask(Task* task)
+  {
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
 
-  // 'registerFramework()' continuation.
-  void _registerFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo,
-      const process::Future<Option<Error>>& validationError);
+    CHECK(tasks[frameworkId].contains(taskId))
+      << "Unknown task " << taskId << " of framework " << frameworkId;
 
-  // 'reregisterFramework()' continuation.
-  void _reregisterFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo,
-      bool failover,
-      const process::Future<Option<Error>>& validationError);
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources[frameworkId] -= task->resources();
+      if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+        usedResources.erase(frameworkId);
+      }
+    }
 
-  // Add a framework.
-  void addFramework(Framework* framework);
+    tasks[frameworkId].erase(taskId);
+    if (tasks[frameworkId].empty()) {
+      tasks.erase(frameworkId);
+    }
 
-  // Replace the scheduler for a framework with a new process ID, in
-  // the event of a scheduler failover.
-  void failoverFramework(Framework* framework, const process::UPID& newPid);
+    killedTasks.remove(frameworkId, taskId);
+  }
 
-  // Kill all of a framework's tasks, delete the framework object, and
-  // reschedule offers that were assigned to this framework.
-  void removeFramework(Framework* framework);
+  void addOffer(Offer* offer)
+  {
+    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
 
-  // Remove a framework from the slave, i.e., remove its tasks and
-  // executors and recover the resources.
-  void removeFramework(Slave* slave, Framework* framework);
+    offers.insert(offer);
+    offeredResources += offer->resources();
+  }
 
-  void disconnect(Framework* framework);
-  void deactivate(Framework* framework);
+  void removeOffer(Offer* offer)
+  {
+    CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
 
-  void disconnect(Slave* slave);
-  void deactivate(Slave* slave);
+    offeredResources -= offer->resources();
+    offers.erase(offer);
+  }
 
-  // Add a slave.
-  void addSlave(
-      Slave* slave,
-      const std::vector<Archive::Framework>& completedFrameworks =
-        std::vector<Archive::Framework>());
+  bool hasExecutor(const FrameworkID& frameworkId,
+                   const ExecutorID& executorId) const
+  {
+    return executors.contains(frameworkId) &&
+      executors.get(frameworkId).get().contains(executorId);
+  }
 
-  // Remove the slave from the registrar. Called when the slave
-  // does not re-register in time after a master failover.
-  Nothing removeSlave(const Registry::Slave& slave);
+  void addExecutor(const FrameworkID& frameworkId,
+                   const ExecutorInfo& executorInfo)
+  {
+    CHECK(!hasExecutor(frameworkId, executorInfo.executor_id()))
+      << "Duplicate executor " << executorInfo.executor_id()
+      << " of framework " << frameworkId;
 
-  // Remove the slave from the registrar and from the master's state.
-  //
-  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
-  void removeSlave(
-      Slave* slave,
-      const std::string& message,
-      Option<process::metrics::Counter> reason = None());
+    executors[frameworkId][executorInfo.executor_id()] = executorInfo;
+    usedResources[frameworkId] += executorInfo.resources();
+  }
 
-  void _removeSlave(
-      const SlaveInfo& slaveInfo,
-      const std::vector<StatusUpdate>& updates,
-      const process::Future<bool>& removed,
-      const std::string& message,
-      Option<process::metrics::Counter> reason = None());
+  void removeExecutor(const FrameworkID& frameworkId,
+                      const ExecutorID& executorId)
+  {
+    CHECK(hasExecutor(frameworkId, executorId))
+      << "Unknown executor " << executorId << " of framework " << frameworkId;
 
-  // Authorizes the task.
-  // Returns true if task is authorized.
-  // Returns false if task is not authorized.
-  // Returns failure for transient authorization failures.
-  process::Future<bool> authorizeTask(
-      const TaskInfo& task,
-      Framework* framework);
+    usedResources[frameworkId] -=
+      executors[frameworkId][executorId].resources();
 
-  // Add the task and its executor (if not already running) to the
-  // framework and slave. Returns the resources consumed as a result,
-  // which includes resources for the task and its executor
-  // (if not already running).
-  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+    // XXX Remove.
 
-  // Transitions the task, and recovers resources if the task becomes
-  // terminal.
-  void updateTask(Task* task, const StatusUpdate& update);
+    executors[frameworkId].erase(executorId);
+    if (executors[frameworkId].empty()) {
+      executors.erase(frameworkId);
+    }
+  }
 
-  // Removes the task.
-  void removeTask(Task* task);
+  void apply(const Offer::Operation& operation)
+  {
+    Try<Resources> resources = totalResources.apply(operation);
+    CHECK_SOME(resources);
 
-  // Remove an executor and recover its resources.
-  void removeExecutor(
-      Slave* slave,
-      const FrameworkID& frameworkId,
-      const ExecutorID& executorId);
+    totalResources = resources.get();
+    checkpointedResources = totalResources.filter(needCheckpointing);
+  }
 
-  // Updates slave's resources by applying the given operation. It
-  // also updates the allocator and sends a CheckpointResourcesMessage
-  // to the slave with slave's current checkpointed resources.
-  void applyOfferOperation(
-      Framework* framework,
-      Slave* slave,
-      const Offer::Operation& operation);
+  const SlaveID id;
+  const SlaveInfo info;
 
-  // Forwards the update to the framework.
-  void forward(
-      const StatusUpdate& update,
-      const process::UPID& acknowledgee,
-      Framework* framework);
+  process::UPID pid;
 
-  // Remove an offer after specified timeout
-  void offerTimeout(const OfferID& offerId);
+  // The Mesos version of the slave. If set, the slave is >= 0.21.0.
+  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
+  // TODO(bmahler): Make this required once it is always set.
+  const Option<std::string> version;
 
-  // Remove an offer and optionally rescind the offer as well.
-  void removeOffer(Offer* offer, bool rescind = false);
+  process::Time registeredTime;
+  Option<process::Time> reregisteredTime;
 
-  Framework* getFramework(const FrameworkID& frameworkId);
-  Slave* getSlave(const SlaveID& slaveId);
-  Offer* getOffer(const OfferID& offerId);
+  // Slave becomes disconnected when the socket closes.
+  bool connected;
 
-  FrameworkID newFrameworkId();
-  OfferID newOfferId();
-  SlaveID newSlaveId();
+  // Slave becomes deactivated when it gets disconnected. In the
+  // future this might also happen via HTTP endpoint.
+  // No offers will be made for a deactivated slave.
+  bool active;
 
-  Option<Credentials> credentials;
+  // Executors running on this slave.
+  hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
-private:
-  void drop(
-      const process::UPID& from,
-      const scheduler::Call& call,
-      const std::string& message);
+  // Tasks present on this slave.
+  // TODO(bmahler): The task pointer ownership complexity arises from the fact
+  // that we own the pointer here, but it's shared with the Framework struct.
+  // We should find a way to eliminate this.
+  hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks;
 
-  void drop(
-      Framework* framework,
-      const Offer::Operation& operation,
-      const std::string& message);
+  // Tasks that were asked to kill by frameworks.
+  // This is used for reconciliation when the slave re-registers.
+  multihashmap<FrameworkID, TaskID> killedTasks;
 
-  // Call handlers.
-  void receive(
-      const process::UPID& from,
-      const scheduler::Call& call);
+  // Active offers on this slave.
+  hashset<Offer*> offers;
 
-  void accept(
-      Framework* framework,
-      const scheduler::Call::Accept& accept);
+  hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
+  Resources offeredResources; // Offers.
 
-  void _accept(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    const Resources& offeredResources,
-    const scheduler::Call::Accept& accept,
-    const process::Future<std::list<process::Future<bool>>>& authorizations);
+  // Resources that should be checkpointed by the slave (e.g.,
+  // persistent volumes, dynamic reservations, etc). These are either
+  // in use by a task/executor, or are available for use and will be
+  // re-offered to the framework.
+  Resources checkpointedResources;
 
-  void reconcile(
-      Framework* framework,
-      const scheduler::Call::Reconcile& reconcile);
+  // The current total resources of the slave. Note that this is
+  // different from 'info.resources()' because this also consider
+  // operations (e.g., CREATE, RESERVE) that have been applied.
+  Resources totalResources;
 
-  void kill(
-      Framework* framework,
-      const scheduler::Call::Kill& kill);
+  SlaveObserver* observer;
 
-  void shutdown(
-      Framework* framework,
-      const scheduler::Call::Shutdown& shutdown);
+private:
+  Slave(const Slave&);              // No copying.
+  Slave& operator = (const Slave&); // No assigning.
+};
 
-  bool elected() const
-  {
-    return leader.isSome() && leader.get() == info_;
-  }
 
-  // Inner class used to namespace HTTP route handlers (see
-  // master/http.cpp for implementations).
-  class Http
-  {
-  public:
-    explicit Http(Master* _master) : master(_master) {}
+inline std::ostream& operator << (std::ostream& stream, const Slave& slave)
+{
+  return stream << slave.id << " at " << slave.pid
+                << " (" << slave.info.hostname() << ")";
+}
 
-    // Logs the request, route handlers can compose this with the
-    // desired request handler to get consistent request logging.
-    static void log(const process::http::Request& request);
 
-    // /master/health
-    process::Future<process::http::Response> health(
-        const process::http::Request& request) const;
+// Information about a connected or completed framework.
+// TODO(bmahler): Keeping the task and executor information in sync
+// across the Slave and Framework structs is error prone!
+struct Framework
+{
+  Framework(const FrameworkInfo& _info,
+            const process::UPID& _pid,
+            const process::Time& time = process::Clock::now())
+    : info(_info),
+      pid(_pid),
+      connected(true),
+      active(true),
+      registeredTime(time),
+      reregisteredTime(time),
+      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
 
-    // /master/observe
-    process::Future<process::http::Response> observe(
-        const process::http::Request& request) const;
+  ~Framework() {}
 
-    // /master/redirect
-    process::Future<process::http::Response> redirect(
-        const process::http::Request& request) const;
+  Task* getTask(const TaskID& taskId)
+  {
+    if (tasks.count(taskId) > 0) {
+      return tasks[taskId];
+    } else {
+      return NULL;
+    }
+  }
 
-    // /master/roles.json
-    process::Future<process::http::Response> roles(
-        const process::http::Request& request) const;
+  void addTask(Task* task)
+  {
+    CHECK(!tasks.contains(task->task_id()))
+      << "Duplicate task " << task->task_id()
+      << " of framework " << task->framework_id();
 
-    // /master/teardown and /master/shutdown (deprecated).
-    process::Future<process::http::Response> teardown(
-        const process::http::Request& request) const;
+    tasks[task->task_id()] = task;
 
-    // /master/slaves
-    process::Future<process::http::Response> slaves(
-        const process::http::Request& request) const;
+    if (!protobuf::isTerminalState(task->state())) {
+      totalUsedResources += task->resources();
+      usedResources[task->slave_id()] += task->resources();
+    }
+  }
 
-    // /master/state.json
-    process::Future<process::http::Response> state(
-        const process::http::Request& request) const;
+  // Notification of task termination, for resource accounting.
+  // TODO(bmahler): This is a hack for performance. We need to
+  // maintain resource counters because computing task resources
+  // functionally for all tasks is expensive, for now.
+  void taskTerminated(Task* task)
+  {
+    CHECK(protobuf::isTerminalState(task->state()));
+    CHECK(tasks.contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
 
-    // /master/state-summary
-    process::Future<process::http::Response> stateSummary(
-        const process::http::Request& request) const;
+    totalUsedResources -= task->resources();
+    usedResources[task->slave_id()] -= task->resources();
+    if (usedResources[task->slave_id()].empty()) {
+      usedResources.erase(task->slave_id());
+    }
+  }
 
-    // /master/tasks.json
-    process::Future<process::http::Response> tasks(
-        const process::http::Request& request) const;
+  void addCompletedTask(const Task& task)
+  {
+    // TODO(adam-mesos): Check if completed task already exists.
+    completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
+  }
 
-    const static std::string HEALTH_HELP;
-    const static std::string OBSERVE_HELP;
-    const static std::string REDIRECT_HELP;
-    const static std::string SHUTDOWN_HELP;  // Deprecated.
-    const static std::string TEARDOWN_HELP;
-    const static std::string SLAVES_HELP;
-    const static std::string TASKS_HELP;
+  void removeTask(Task* task)
+  {
+    CHECK(tasks.contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
 
-  private:
-    // Helper for doing authentication, returns the credential used if
-    // the authentication was successful (or none if no credentials
-    // have been given to the master), otherwise an Error.
-    Result<Credential> authenticate(
-        const process::http::Request& request) const;
+    if (!protobuf::isTerminalState(task->state())) {
+      totalUsedResources -= task->resources();
+      usedResources[task->slave_id()] -= task->resources();
+      if (usedResources[task->slave_id()].empty()) {
+        usedResources.erase(task->slave_id());
+      }
+    }
 
-    // Continuations.
-    process::Future<process::http::Response> _teardown(
-        const FrameworkID& id,
-        bool authorized = true) const;
+    addCompletedTask(*task);
 
-    Master* master;
-  };
+    tasks.erase(task->task_id());
+  }
 
-  Master(const Master&);              // No copying.
-  Master& operator = (const Master&); // No assigning.
+  void addOffer(Offer* offer)
+  {
+    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+    offers.insert(offer);
+    totalOfferedResources += offer->resources();
+    offeredResources[offer->slave_id()] += offer->resources();
+  }
 
-  friend struct Metrics;
+  void removeOffer(Offer* offer)
+  {
+    CHECK(offers.find(offer) != offers.end())
+      << "Unknown offer " << offer->id();
 
-  // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
-  // make the following functions friends so that validation functions
-  // can get Offer* and Slave*.
-  friend Offer* validation::offer::getOffer(
-      Master* master, const OfferID& offerId);
+    totalOfferedResources -= offer->resources();
+    offeredResources[offer->slave_id()] -= offer->resources();
+    if (offeredResources[offer->slave_id()].empty()) {
+      offeredResources.erase(offer->slave_id());
+    }
 
-  friend Slave* validation::offer::getSlave(
-      Master* master, const SlaveID& slaveId);
+    offers.erase(offer);
+  }
 
-  const Flags flags;
+  bool hasExecutor(const SlaveID& slaveId,
+                   const ExecutorID& executorId)
+  {
+    return executors.contains(slaveId) &&
+      executors[slaveId].contains(executorId);
+  }
 
-  Option<MasterInfo> leader; // Current leading master.
+  void addExecutor(const SlaveID& slaveId,
+                   const ExecutorInfo& executorInfo)
+  {
+    CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
+      << "Duplicate executor " << executorInfo.executor_id()
+      << " on slave " << slaveId;
 
-  mesos::master::allocator::Allocator* allocator;
-  WhitelistWatcher* whitelistWatcher;
-  Registrar* registrar;
-  Repairer* repairer;
-  Files* files;
+    executors[slaveId][executorInfo.executor_id()] = executorInfo;
+    totalUsedResources += executorInfo.resources();
+    usedResources[slaveId] += executorInfo.resources();
+  }
 
-  MasterContender* contender;
-  MasterDetector* detector;
+  void removeExecutor(const SlaveID& slaveId,
+                      const ExecutorID& executorId)
+  {
+    CHECK(hasExecutor(slaveId, executorId))
+      << "Unknown executor " << executorId
+      << " of framework " << id()
+      << " of slave " << slaveId;
 
-  const Option<Authorizer*> authorizer;
+    totalUsedResources -= executors[slaveId][executorId].resources();
+    usedResources[slaveId] -= executors[slaveId][executorId].resources();
+    if (usedResources[slaveId].empty()) {
+      usedResources.erase(slaveId);
+    }
 
-  MasterInfo info_;
+    executors[slaveId].erase(executorId);
+    if (executors[slaveId].empty()) {
+      executors.erase(slaveId);
+    }
+  }
 
-  // Indicates when recovery is complete. Recovery begins once the
-  // master is elected as a leader.
-  Option<process::Future<Nothing>> recovered;
+  const FrameworkID id() const { return info.id(); }
 
-  struct Slaves
+  // Update fields in 'info' using those in 'source'. Currently this
+  // only updates 'name', 'failover_timeout', 'hostname', and
+  // 'webui_url'.
+  void updateFrameworkInfo(const FrameworkInfo& source)
   {
-    Slaves() : removed(MAX_REMOVED_SLAVES) {}
+    // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
+    // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
+    // merge 'info' from the same framework 'id'.
 
-    // Imposes a time limit for slaves that we recover from the
-    // registry to re-register with the master.
-    Option<process::Timer> recoveredTimer;
+    // TODO(jmlvanre): Merge other fields as per design doc in
+    // MESOS-703.
 
-    // Slaves that have been recovered from the registrar but have yet
-    // to re-register. We keep a "reregistrationTimer" above to ensure
-    // we remove these slaves if they do not re-register.
-    hashset<SlaveID> recovered;
+    if (source.user() != info.user()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
+                   << "' for framework " << id() << ". Check MESOS-703";
+    }
 
-    // Slaves that are in the process of registering.
-    hashset<process::UPID> registering;
+    info.set_name(source.name());
 
-    // Only those slaves that are re-registering for the first time
-    // with this master. We must not answer questions related to
-    // these slaves until the registrar determines their fate.
-    hashset<SlaveID> reregistering;
+    if (source.has_failover_timeout()) {
+      info.set_failover_timeout(source.failover_timeout());
+    } else {
+      info.clear_failover_timeout();
+    }
 
-    hashmap<SlaveID, Slave*> registered;
+    if (source.checkpoint() != info.checkpoint()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
+                   << stringify(info.checkpoint()) << "' for framework " << id()
+                   << ". Check MESOS-703";
+    }
 
-    // Slaves that are in the process of being removed from the
-    // registrar. Think of these as being partially removed: we must
-    // not answer questions related to these until they are removed
-    // from the registry.
-    hashset<SlaveID> removing;
+    if (source.role() != info.role()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
+                   << "' for framework " << id() << ". Check MESOS-703";
+    }
 
-    // We track removed slaves to preserve the consistency
-    // semantics of the pre-registrar code when a non-strict registrar
-    // is being used. That is, if we remove a slave, we must make
-    // an effort to prevent it from (re-)registering, sending updates,
-    // etc. We keep a cache here to prevent this from growing in an
-    // unbounded manner.
-    // TODO(bmahler): Ideally we could use a cache with set semantics.
-    Cache<SlaveID, Nothing> removed;
+    if (source.has_hostname()) {
+      info.set_hostname(source.hostname());
+    } else {
+      info.clear_hostname();
+    }
 
-    // This rate limiter is used to limit the removal of slaves failing
-    // health checks.
-    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
-    // a wrapper around libprocess process which is thread safe.
-    Option<std::shared_ptr<process::RateLimiter>> limiter;
+    if (source.principal() != info.principal()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
+                   << info.principal() << "' for framework " << id()
+                   << ". Check MESOS-703";
+    }
 
-    bool transitioning(const Option<SlaveID>& slaveId)
-    {
-      if (slaveId.isSome()) {
-        return recovered.contains(slaveId.get()) ||
-               reregistering.contains(slaveId.get()) ||
-               removing.contains(slaveId.get());
-      } else {
-        return !recovered.empty() ||
-               !reregistering.empty() ||
-               !removing.empty();
-      }
+    if (source.has_webui_url()) {
+      info.set_webui_url(source.webui_url());
+    } else {
+      info.clear_webui_url();
     }
-  } slaves;
+  }
 
-  struct Frameworks
-  {
-    Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
+  FrameworkInfo info;
 
-    hashmap<FrameworkID, Framework*> registered;
-    boost::circular_buffer<std::shared_ptr<Framework>> completed;
+  process::UPID pid;
 
-    // Principals of frameworks keyed by PID.
-    // NOTE: Multiple PIDs can map to the same principal. The
-    // principal is None when the framework doesn't specify it.
-    // The differences between this map and 'authenticated' are:
-    // 1) This map only includes *registered* frameworks. The mapping
-    //    is added when a framework (re-)registers.
-    // 2) This map includes unauthenticated frameworks (when Master
-    //    allows them) if they have principals specified in
-    //    FrameworkInfo.
-    hashmap<process::UPID, Option<std::string>> principals;
+  // Framework becomes disconnected when the socket closes.
+  bool connected;
 
-    // BoundedRateLimiters keyed by the framework principal.
-    // Like Metrics::Frameworks, all frameworks of the same principal
-    // are throttled together at a common rate limit.
-    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+  // Framework becomes deactivated when it is disconnected or
+  // the master receives a DeactivateFrameworkMessage.
+  // No offers will be made to a deactivated framework.
+  bool active;
 
-    // The default limiter is for frameworks not specified in
-    // 'flags.rate_limits'.
-    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
-  } frameworks;
+  process::Time registeredTime;
+  process::Time reregisteredTime;
+  process::Time unregisteredTime;
 
-  hashmap<OfferID, Offer*> offers;
-  hashmap<OfferID, process::Timer> offerTimers;
+  // Tasks that have not yet been launched because they are currently
+  // being authorized.
+  hashmap<TaskID, TaskInfo> pendingTasks;
 
-  hashmap<std::string, Role*> roles;
+  hashmap<TaskID, Task*> tasks;
 
-  // Authenticator names as supplied via flags.
-  std::vector<std::string> authenticatorNames;
+  // NOTE: We use a shared pointer for Task because clang doesn't like
+  // Boost's implementation of circular_buffer with Task (Boost
+  // attempts to do some memset's which are unsafe).
+  boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
 
-  Option<Authenticator*> authenticator;
+  hashset<Offer*> offers; // Active offers for framework.
 
-  // Frameworks/slaves that are currently in the process of authentication.
-  // 'authenticating' future is completed when authenticator
-  // completes authentication.
-  // The future is removed from the map when master completes authentication.
-  hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
+  hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
-  // Principals of authenticated frameworks/slaves keyed by PID.
-  hashmap<process::UPID, std::string> authenticated;
+  // NOTE: For the used and offered resources below, we keep the
+  // total as well as partitioned by SlaveID.
+  // We expose the total resources via the HTTP endpoint, and we
+  // keep a running total of the resources because looping over the
+  // slaves to sum the resources has led to perf issues (MESOS-1862).
+  // We keep the resources partitioned by SlaveID because non-scalar
+  // resources can be lost when summing them up across multiple
+  // slaves (MESOS-2373).
+  //
+  // Also note that keeping the totals is safe even though it yields
+  // incorrect results for non-scalar resources.
+  //   (1) For overlapping set items / ranges across slaves, these
+  //       will get added N times but only represented once.
+  //   (2) When an initial subtraction occurs (N-1), the resource is
+  //       no longer represented. (This is the source of the bug).
+  //   (3) When any further subtractions occur (N-(1+M)), the
+  //       Resources simply ignores the subtraction since there's
+  //       nothing to remove, so this is safe for now.
 
-  int64_t nextFrameworkId; // Used to give each framework a unique ID.
-  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
-  int64_t nextSlaveId;     // Used to give each slave a unique ID.
+  // TODO(mpark): Strip the non-scalar resources out of the totals
+  // in order to avoid reporting incorrect statistics (MESOS-2623).
 
-  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
-  // thread safe.
-  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
-  // copyable metric types only.
-  std::shared_ptr<Metrics> metrics;
+  // Active task / executor resources.
+  Resources totalUsedResources;
+  hashmap<SlaveID, Resources> usedResources;
 
-  // Gauge handlers.
-  double _uptime_secs()
-  {
-    return (process::Clock::now() - startTime).secs();
-  }
+  // Offered resources.
+  Resources totalOfferedResources;
+  hashmap<SlaveID, Resources> offeredResources;
 
-  double _elected()
-  {
-    return elected() ? 1 : 0;
-  }
+private:
+  Framework(const Framework&);              // No copying.
+  Framework& operator = (const Framework&); // No assigning.
+};
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const Framework& framework)
+{
+  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
+  // updated on framework failover (MESOS-1784).
+  return stream << framework.id() << " (" << framework.info.name()
+                << ") at " << framework.pid;
+}
 
-  double _slaves_connected();
-  double _slaves_disconnected();
-  double _slaves_active();
-  double _slaves_inactive();
 
-  double _frameworks_connected();
-  double _frameworks_disconnected();
-  double _frameworks_active();
-  double _frameworks_inactive();
+// Information about an active role.
+struct Role
+{
+  explicit Role(const mesos::master::RoleInfo& _info)
+    : info(_info) {}
 
-  double _outstanding_offers()
+  void addFramework(Framework* framework)
   {
-    return offers.size();
+    frameworks[framework->id()] = framework;
   }
 
-  double _event_queue_messages()
+  void removeFramework(Framework* framework)
   {
-    return static_cast<double>(eventCount<process::MessageEvent>());
+    frameworks.erase(framework->id());
   }
 
-  double _event_queue_dispatches()
+  Resources resources() const
   {
-    return static_cast<double>(eventCount<process::DispatchEvent>());
-  }
+    Resources resources;
+    foreachvalue (Framework* framework, frameworks) {
+      resources += framework->totalUsedResources;
+      resources += framework->totalOfferedResources;
+    }
 
-  double _event_queue_http_requests()
-  {
-    return static_cast<double>(eventCount<process::HttpEvent>());
+    return resources;
   }
 
-  double _tasks_staging();
-  double _tasks_starting();
-  double _tasks_running();
+  mesos::master::RoleInfo info;
 
-  double _resources_total(const std::string& name);
-  double _resources_used(const std::string& name);
-  double _resources_percent(const std::string& name);
+  hashmap<FrameworkID, Framework*> frameworks;
+};
 
-  process::Time startTime; // Start time used to calculate uptime.
 
-  Option<process::Time> electedTime; // Time when this master is elected.
+class Master : public ProtobufProcess<Master>
+{
+public:
+  Master(mesos::master::allocator::Allocator* allocator,
+         Registrar* registrar,
+         Repairer* repairer,
+         Files* files,
+         MasterContender* contender,
+         MasterDetector* detector,
+         const Option<Authorizer*>& authorizer,
+         const Option<std::shared_ptr<process::RateLimiter>>&
+           slaveRemovalLimiter,
+         const Flags& flags = Flags());
 
-  // Validates the framework including authorization.
-  // Returns None if the framework is valid.
-  // Returns Error if the framework is invalid.
-  // Returns Failure if authorization returns 'Failure'.
-  process::Future<Option<Error>> validate(
+  virtual ~Master();
+
+  // Message handlers.
+  void submitScheduler(
+      const std::string& name);
+
+  void registerFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo);
+
+  void reregisterFramework(
+      const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
-      const process::UPID& from);
-};
+      bool failover);
 
+  void unregisterFramework(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
 
-struct Slave
-{
-  Slave(const SlaveInfo& _info,
-        const process::UPID& _pid,
-        const Option<std::string> _version,
-        const process::Time& _registeredTime,
-        const Resources& _checkpointedResources,
-        const std::vector<ExecutorInfo> executorInfos =
-          std::vector<ExecutorInfo>(),
-        const std::vector<Task> tasks =
-          std::vector<Task>())
-    : id(_info.id()),
-      info(_info),
-      pid(_pid),
-      version(_version),
-      registeredTime(_registeredTime),
-      connected(true),
-      active(true),
-      checkpointedResources(_checkpointedResources),
-      observer(NULL)
-  {
-    CHECK(_info.has_id());
+  void deactivateFramework(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
 
-    Try<Resources> resources = applyCheckpointedResources(
-        info.resources(),
-        _checkpointedResources);
+  // TODO(vinod): Remove this once the old driver is removed.
+  void resourceRequest(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const std::vector<Request>& requests);
 
-    // NOTE: This should be validated during slave recovery.
-    CHECK_SOME(resources);
-    totalResources = resources.get();
+  void launchTasks(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const std::vector<TaskInfo>& tasks,
+      const Filters& filters,
+      const std::vector<OfferID>& offerIds);
 
-    foreach (const ExecutorInfo& executorInfo, executorInfos) {
-      CHECK(executorInfo.has_framework_id());
-      addExecutor(executorInfo.framework_id(), executorInfo);
-    }
+  void reviveOffers(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
 
-    foreach (const Task& task, tasks) {
-      addTask(new Task(task));
-    }
-  }
+  void killTask(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId);
 
-  ~Slave() {}
+  void statusUpdateAcknowledgement(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId,
+      const std::string& uuid);
 
-  Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
-  {
-    if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
-      return tasks[frameworkId][taskId];
-    }
-    return NULL;
-  }
+  void schedulerMessage(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const std::string& data);
 
-  void addTask(Task* task)
-  {
-    const TaskID& taskId = task->task_id();
-    const FrameworkID& frameworkId = task->framework_id();
+  void registerSlave(
+      const process::UPID& from,
+      const SlaveInfo& slaveInfo,
+      const std::vector<Resource>& checkpointedResources,
+      const std::string& version);
 
-    CHECK(!tasks[frameworkId].contains(taskId))
-      << "Duplicate task " << taskId << " of framework " << frameworkId;
+  void reregisterSlave(
+      const process::UPID& from,
+      const SlaveInfo& slaveInfo,
+      const std::vector<Resource>& checkpointedResources,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<Archive::Framework>& completedFrameworks,
+      const std::string& version);
 
-    tasks[frameworkId][taskId] = task;
+  void unregisterSlave(
+      const process::UPID& from,
+      const SlaveID& slaveId);
 
-    if (!protobuf::isTerminalState(task->state())) {
-      usedResources[frameworkId] += task->resources();
-    }
+  void statusUpdate(
+      const StatusUpdate& update,
+      const process::UPID& pid);
+
+  void reconcileTasks(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const std::vector<TaskStatus>& statuses);
+
+  void exitedExecutor(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      int32_t status);
+
+  void shutdownSlave(
+      const SlaveID& slaveId,
+      const std::string& message);
+
+  void authenticate(
+      const process::UPID& from,
+      const process::UPID& pid);
+
+  // TODO(bmahler): It would be preferred to use a unique libprocess
+  // Process identifier (PID is not sufficient) for identifying the
+  // framework instance, rather than relying on re-registration time.
+  void frameworkFailoverTimeout(
+      const FrameworkID& frameworkId,
+      const process::Time& reregisteredTime);
+
+  void offer(
+      const FrameworkID& framework,
+      const hashmap<SlaveID, Resources>& resources);
+
+  // Invoked when there is a newly elected leading master.
+  // Made public for testing purposes.
+  void detected(const process::Future<Option<MasterInfo>>& pid);
+
+  // Invoked when the contender has lost the candidacy.
+  // Made public for testing purposes.
+  void lostCandidacy(const process::Future<Nothing>& lost);
+
+  // Continuation of recover().
+  // Made public for testing purposes.
+  process::Future<Nothing> _recover(const Registry& registry);
+
+  // Continuation of reregisterSlave().
+  // Made public for testing purposes.
+  // TODO(vinod): Instead of doing this create and use a
+  // MockRegistrar.
+  // TODO(dhamon): Consider FRIEND_TEST macro from gtest.
+  void _reregisterSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
+      const std::vector<Resource>& checkpointedResources,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<Archive::Framework>& completedFrameworks,
+      const std::string& version,
+      const process::Future<bool>& readmit);
+
+  MasterInfo info() const
+  {
+    return info_;
+  }
+
+protected:
+  virtual void initialize();
+  virtual void finalize();
+  virtual void exited(const process::UPID& pid);
+  virtual void visit(const process::MessageEvent& event);
+  virtual void visit(const process::ExitedEvent& event);
 
-    LOG(INFO) << "Adding task " << taskId
-              << " with resources " << task->resources()
-              << " on slave " << id << " (" << info.hostname() << ")";
-  }
+  // Invoked when the message is ready to be executed after
+  // being throttled.
+  // 'principal' being None indicates it is throttled by
+  // 'defaultLimiter'.
+  void throttled(
+      const process::MessageEvent& event,
+      const Option<std::string>& principal);
 
-  // Notification of task termination, for resource accounting.
-  // TODO(bmahler): This is a hack for performance. We need to
-  // maintain resource counters because computing task resources
-  // functionally for all tasks is expensive, for now.
-  void taskTerminated(Task* task)
-  {
-    const TaskID& taskId = task->task_id();
-    const FrameworkID& frameworkId = task->framework_id();
+  // Continuations of visit().
+  void _visit(const process::MessageEvent& event);
+  void _visit(const process::ExitedEvent& event);
 
-    CHECK(protobuf::isTerminalState(task->state()));
-    CHECK(tasks[frameworkId].contains(taskId))
-      << "Unknown task " << taskId << " of framework " << frameworkId;
+  // Helper method invoked when the capacity for a framework
+  // principal is exceeded.
+  void exceededCapacity(
+      const process::MessageEvent& event,
+      const Option<std::string>& principal,
+      uint64_t capacity);
 
-    usedResources[frameworkId] -= task->resources();
-    if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
-      usedResources.erase(frameworkId);
-    }
-  }
+  // Recovers state from the registrar.
+  process::Future<Nothing> recover();
+  void recoveredSlavesTimeout(const Registry& registry);
 
-  void removeTask(Task* task)
-  {
-    const TaskID& taskId = task->task_id();
-    const FrameworkID& frameworkId = task->framework_id();
+  void _registerSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
+      const std::vector<Resource>& checkpointedResources,
+      const std::string& version,
+      const process::Future<bool>& admit);
 
-    CHECK(tasks[frameworkId].contains(taskId))
-      << "Unknown task " << taskId << " of framework " << frameworkId;
+  void __reregisterSlave(
+      Slave* slave,
+      const std::vector<Task>& tasks);
 
-    if (!protobuf::isTerminalState(task->state())) {
-      usedResources[frameworkId] -= task->resources();
-      if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
-        usedResources.erase(frameworkId);
-      }
-    }
+  // 'authenticate' is the future returned by the authenticator.
+  void _authenticate(
+      const process::UPID& pid,
+      const process::Future<Option<std::string>>& authenticate);
 
-    tasks[frameworkId].erase(taskId);
-    if (tasks[frameworkId].empty()) {
-      tasks.erase(frameworkId);
-    }
+  void authenticationTimeout(process::Future<Option<std::string>> future);
 
-    killedTasks.remove(frameworkId, taskId);
-  }
+  void fileAttached(const process::Future<Nothing>& result,
+                    const std::string& path);
 
-  void addOffer(Offer* offer)
-  {
-    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+  // Invoked when the contender has entered the contest.
+  void contended(const process::Future<process::Future<Nothing>>& candidacy);
 
-    offers.insert(offer);
-    offeredResources += offer->resources();
-  }
+  // Task reconciliation, split from the message handler
+  // to allow re-use.
+  void _reconcileTasks(
+      Framework* framework,
+      const std::vector<TaskStatus>& statuses);
 
-  void removeOffer(Offer* offer)
-  {
-    CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
+  // Handles a known re-registering slave by reconciling the master's
+  // view of the slave's tasks and executors.
+  void reconcile(
+      Slave* slave,
+      const std::vector<ExecutorInfo>& executors,
+      const std::vector<Task>& tasks);
 
-    offeredResources -= offer->resources();
-    offers.erase(offer);
-  }
+  // 'registerFramework()' continuation.
+  void _registerFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      const process::Future<Option<Error>>& validationError);
 
-  bool hasExecutor(const FrameworkID& frameworkId,
-                   const ExecutorID& executorId) const
-  {
-    return executors.contains(frameworkId) &&
-      executors.get(frameworkId).get().contains(executorId);
-  }
+  // 'reregisterFramework()' continuation.
+  void _reregisterFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      bool failover,
+      const process::Future<Option<Error>>& validationError);
 
-  void addExecutor(const FrameworkID& frameworkId,
-                   const ExecutorInfo& executorInfo)
-  {
-    CHECK(!hasExecutor(frameworkId, executorInfo.executor_id()))
-      << "Duplicate executor " << executorInfo.executor_id()
-      << " of framework " << frameworkId;
+  // Add a framework.
+  void addFramework(Framework* framework);
 
-    executors[frameworkId][executorInfo.executor_id()] = executorInfo;
-    usedResources[frameworkId] += executorInfo.resources();
-  }
+  // Replace the scheduler for a framework with a new process ID, in
+  // the event of a scheduler failover.
+  void failoverFramework(Framework* framework, const process::UPID& newPid);
 
-  void removeExecutor(const FrameworkID& frameworkId,
-                      const ExecutorID& executorId)
-  {
-    CHECK(hasExecutor(frameworkId, executorId))
-      << "Unknown executor " << executorId << " of framework " << frameworkId;
+  // Kill all of a framework's tasks, delete the framework object, and
+  // reschedule offers that were assigned to this framework.
+  void removeFramework(Framework* framework);
 
-    usedResources[frameworkId] -=
-      executors[frameworkId][executorId].resources();
+  // Remove a framework from the slave, i.e., remove its tasks and
+  // executors and recover the resources.
+  void removeFramework(Slave* slave, Framework* framework);
 
-    // XXX Remove.
+  void disconnect(Framework* framework);
+  void deactivate(Framework* framework);
 
-    executors[frameworkId].erase(executorId);
-    if (executors[frameworkId].empty()) {
-      executors.erase(frameworkId);
-    }
-  }
+  void disconnect(Slave* slave);
+  void deactivate(Slave* slave);
 
-  void apply(const Offer::Operation& operation)
-  {
-    Try<Resources> resources = totalResources.apply(operation);
-    CHECK_SOME(resources);
+  // Add a slave.
+  void addSlave(
+      Slave* slave,
+      const std::vector<Archive::Framework>& completedFrameworks =
+        std::vector<Archive::Framework>());
 
-    totalResources = resources.get();
-    checkpointedResources = totalResources.filter(needCheckpointing);
-  }
+  // Remove the slave from the registrar. Called when the slave
+  // does not re-register in time after a master failover.
+  Nothing removeSlave(const Registry::Slave& slave);
 
-  const SlaveID id;
-  const SlaveInfo info;
+  // Remove the slave from the registrar and from the master's state.
+  //
+  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
+  void removeSlave(
+      Slave* slave,
+      const std::string& message,
+      Option<process::metrics::Counter> reason = None());
 
-  process::UPID pid;
+  void _removeSlave(
+      const SlaveInfo& slaveInfo,
+      const std::vector<StatusUpdate>& updates,
+      const process::Future<bool>& removed,
+      const std::string& message,
+      Option<process::metrics::Counter> reason = None());
 
-  // The Mesos version of the slave. If set, the slave is >= 0.21.0.
-  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
-  // TODO(bmahler): Make this required once it is always set.
-  const Option<std::string> version;
+  // Authorizes the task.
+  // Returns true if task is authorized.
+  // Returns false if task is not authorized.
+  // Returns failure for transient authorization failures.
+  process::Future<bool> authorizeTask(
+      const TaskInfo& task,
+      Framework* framework);
 
-  process::Time registeredTime;
-  Option<process::Time> reregisteredTime;
+  // Add the task and its executor (if not already running) to the
+  // framework and slave. Returns the resources consumed as a result,
+  // which includes resources for the task and its executor
+  // (if not already running).
+  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
 
-  // Slave becomes disconnected when the socket closes.
-  bool connected;
+  // Transitions the task, and recovers resources if the task becomes
+  // terminal.
+  void updateTask(Task* task, const StatusUpdate& update);
 
-  // Slave becomes deactivated when it gets disconnected. In the
-  // future this might also happen via HTTP endpoint.
-  // No offers will be made for a deactivated slave.
-  bool active;
+  // Removes the task.
+  void removeTask(Task* task);
 
-  // Executors running on this slave.
-  hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
+  // Remove an executor and recover its resources.
+  void removeExecutor(
+      Slave* slave,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId);
 
-  // Tasks present on this slave.
-  // TODO(bmahler): The task pointer ownership complexity arises from the fact
-  // that we own the pointer here, but it's shared with the Framework struct.
-  // We should find a way to eliminate this.
-  hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks;
+  // Updates slave's resources by applying the given operation. It
+  // also updates the allocator and sends a CheckpointResourcesMessage
+  // to the slave with slave's current checkpointed resources.
+  void applyOfferOperation(
+      Framework* framework,
+      Slave* slave,
+      const Offer::Operation& operation);
 
-  // Tasks that were asked to kill by frameworks.
-  // This is used for reconciliation when the slave re-registers.
-  multihashmap<FrameworkID, TaskID> killedTasks;
+  // Forwards the update to the framework.
+  void forward(
+      const StatusUpdate& update,
+      const process::UPID& acknowledgee,
+      Framework* framework);
 
-  // Active offers on this slave.
-  hashset<Offer*> offers;
+  // Remove an offer after specified timeout
+  void offerTimeout(const OfferID& offerId);
 
-  hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
-  Resources offeredResources; // Offers.
+  // Remove an offer and optionally rescind the offer as well.
+  void removeOffer(Offer* offer, bool rescind = false);
 
-  // Resources that should be checkpointed by the slave (e.g.,
-  // persistent volumes, dynamic reservations, etc). These are either
-  // in use by a task/executor, or are available for use and will be
-  // re-offered to the framework.
-  Resources checkpointedResources;
+  Framework* getFramework(const FrameworkID& frameworkId);
+  Slave* getSlave(const SlaveID& slaveId);
+  Offer* getOffer(const OfferID& offerId);
 
-  // The current total resources of the slave. Note that this is
-  // different from 'info.resources()' because this also consider
-  // operations (e.g., CREATE, RESERVE) that have been applied.
-  Resources totalResources;
+  FrameworkID newFrameworkId();
+  OfferID newOfferId();
+  SlaveID newSlaveId();
 
-  SlaveObserver* observer;
+  Option<Credentials> credentials;
 
 private:
-  Slave(const Slave&);              // No copying.
-  Slave& operator = (const Slave&); // No assigning.
-};
+  void drop(
+      const process::UPID& from,
+      const scheduler::Call& call,
+      const std::string& message);
 
+  void drop(
+      Framework* framework,
+      const Offer::Operation& operation,
+      const std::string& message);
 
-inline std::ostream& operator << (std::ostream& stream, const Slave& slave)
-{
-  return stream << slave.id << " at " << slave.pid
-                << " (" << slave.info.hostname() << ")";
-}
+  // Call handlers.
+  void receive(
+      const process::UPID& from,
+      const scheduler::Call& call);
+
+  void accept(
+      Framework* framework,
+      const scheduler::Call::Accept& accept);
 
+  void _accept(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& offeredResources,
+    const scheduler::Call::Accept& accept,
+    const process::Future<std::list<process::Future<bool>>>& authorizations);
 
-// Information about a connected or completed framework.
-// TODO(bmahler): Keeping the task and executor information in sync
-// across the Slave and Framework structs is error prone!
-struct Framework
-{
-  Framework(const FrameworkInfo& _info,
-            const process::UPID& _pid,
-            const process::Time& time = process::Clock::now())
-    : info(_info),
-      pid(_pid),
-      connected(true),
-      active(true),
-      registeredTime(time),
-      reregisteredTime(time),
-      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
+  void reconcile(
+      Framework* framework,
+      const scheduler::Call::Reconcile& reconcile);
 
-  ~Framework() {}
+  void kill(
+      Framework* framework,
+      const scheduler::Call::Kill& kill);
 
-  Task* getTask(const TaskID& taskId)
+  void shutdown(
+      Framework* framework,
+      const scheduler::Call::Shutdown& shutdown);
+
+  bool elected() const
   {
-    if (tasks.count(taskId) > 0) {
-      return tasks[taskId];
-    } else {
-      return NULL;
-    }
+    return leader.isSome() && leader.get() == info_;
   }
 
-  void addTask(Task* task)
+  // Inner class used to namespace HTTP route handlers (see
+  // master/http.cpp for implementations).
+  class Http
   {
-    CHECK(!tasks.contains(task->task_id()))
-      << "Duplicate task " << task->task_id()
-      << " of framework " << task->framework_id();
+  public:
+    explicit Http(Master* _master) : master(_master) {}
 
-    tasks[task->task_id()] = task;
+    // Logs the request, route handlers can compose this with the
+    // desired request handler to get consistent request logging.
+    static void log(const process::http::Request& request);
 
-    if (!protobuf::isTerminalState(task->state())) {
-      totalUsedResources += task->resources();
-      usedResources[task->slave_id()] += task->resources();
-    }
-  }
+    // /master/health
+    process::Future<process::http::Response> health(
+        const process::http::Request& request) const;
 
-  // Notification of task termination, for resource accounting.
-  // TODO(bmahler): This is a hack for performance. We need to
-  // maintain resource counters because computing task resources
-  // functionally for all tasks is expensive, for now.
-  void taskTerminated(Task* task)
-  {
-    CHECK(protobuf::isTerminalState(task->state()));
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
+    // /master/observe
+    process::Future<process::http::Response> observe(
+        const process::http::Request& request) const;
 
-    totalUsedResources -= task->resources();
-    usedResources[task->slave_id()] -= task->resources();
-    if (usedResources[task->slave_id()].empty()) {
-      usedResources.erase(task->slave_id());
-    }
-  }
+    // /master/redirect
+    process::Future<process::http::Response> redirect(
+        const process::http::Request& request) const;
 
-  void addCompletedTask(const Task& task)
-  {
-    // TODO(adam-mesos): Check if completed task already exists.
-    completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
-  }
+    // /master/roles.json
+    process::Future<process::http::Response> roles(
+        const process::http::Request& request) const;
 
-  void removeTask(Task* task)
-  {
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
+    // /master/teardown and /master/shutdown (deprecated).
+    process::Future<process::http::Response> teardown(
+        const process::http::Request& request) const;
 
-    if (!protobuf::isTerminalState(task->state())) {
-      totalUsedResources -= task->resources();
-      usedResources[task->slave_id()] -= task->resources();
-      if (usedResources[task->slave_id()].empty()) {
-        usedResources.erase(task->slave_id());
-      }
-    }
+    // /master/slaves
+    process::Future<process::http::Response> slaves(
+        const process::http::Request& request) const;
 
-    addCompletedTask(*task);
+    // /master/state.json
+    process::Future<process::http::Response> state(
+        const process::http::Request& request) const;
 
-    tasks.erase(task->task_id());
-  }
+    // /master/state-summary
+    process::Future<process::http::Response> stateSummary(
+        const process::http::Request& request) const;
 
-  void addOffer(Offer* offer)
-  {
-    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
-    offers.insert(offer);
-    totalOfferedResources += offer->resources();
-    offeredResources[offer->slave_id()] += offer->resources();
-  }
+    // /master/tasks.json
+    process::Future<process::http::Response> tasks(
+        const process::http::Request& request) const;
 
-  void removeOffer(Offer* offer)
-  {
-    CHECK(offers.find(offer) != offers.end())
-      << "Unknown offer " << offer->id();
+    const static std::string HEALTH_HELP;
+    const static std::string OBSERVE_HELP;
+    const static std::string REDIRECT_HELP;
+    const static std::string SHUTDOWN_HELP;  // Deprecated.
+    const static std::string TEARDOWN_HELP;
+    const static std::string SLAVES_HELP;
+    const static std::string TASKS_HELP;
 
-    totalOfferedResources -= offer->resources();
-    offeredResources[offer->slave_id()] -= offer->resources();
-    if (offeredResources[offer->slave_id()].empty()) {
-      offeredResources.erase(offer->slave_id());
-    }
+  private:
+    // Helper for doing authentication, returns the credential used if
+    // the authentication was successful (or none if no credentials
+    // have been given to the master), otherwise an Error.
+    Result<Credential> authenticate(
+        const process::http::Request& request) const;
 
-    offers.erase(offer);
-  }
+    // Continuations.
+    process::Future<process::http::Response> _teardown(
+        const FrameworkID& id,
+        bool authorized = true) const;
 
-  bool hasExecutor(const SlaveID& slaveId,
-                   const ExecutorID& executorId)
-  {
-    return executors.contains(slaveId) &&
-      executors[slaveId].contains(executorId);
-  }
+    Master* master;
+  };
 
-  void addExecutor(const SlaveID& slaveId,
-                   const ExecutorInfo& executorInfo)
-  {
-    CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
-      << "Duplicate executor " << executorInfo.executor_id()
-      << " on slave " << slaveId;
+  Master(const Master&);              // No copying.
+  Master& operator = (const Master&); // No assigning.
 
-    executors[slaveId][executorInfo.executor_id()] = executorInfo;
-    totalUsedResources += executorInfo.resources();
-    usedResources[slaveId] += executorInfo.resources();
-  }
+  friend struct Metrics;
 
-  void removeExecutor(const SlaveID& slaveId,
-                      const ExecutorID& executorId)
-  {
-    CHECK(hasExecutor(slaveId, executorId))
-      << "Unknown executor " << executorId
-      << " of framework " << id()
-      << " of slave " << slaveId;
+  // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
+  // make the following functions friends so that validation functions
+  // can get Offer* and Slave*.
+  friend Offer* validation::offer::getOffer(
+      Master* master, const OfferID& offerId);
 
-    totalUsedResources -= executors[slaveId][executorId].resources();
-    usedResources[slaveId] -= executors[slaveId][executorId].resources();
-    if (usedResources[slaveId].empty()) {
-      usedResources.erase(slaveId);
-    }
+  friend Slave* validation::offer::getSlave(
+      Master* master, const SlaveID& slaveId);
 
-    executors[slaveId].erase(executorId);
-    if (executors[slaveId].empty()) {
-      executors.erase(slaveId);
-    }
-  }
+  const Flags flags;
 
-  const FrameworkID id() const { return info.id(); }
+  Option<MasterInfo> leader; // Current leading master.
 
-  // Update fields in 'info' using those in 'source'. Currently this
-  // only updates 'name', 'failover_timeout', 'hostname', and
-  // 'webui_url'.
-  void updateFrameworkInfo(const FrameworkInfo& source)
-  {
-    // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
-    // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
-    // merge 'info' from the same framework 'id'.
+  mesos::master::allocator::Allocator* allocator;
+  WhitelistWatcher* whitelistWatcher;
+  Registrar* registrar;
+  Repairer* repairer;
+  Files* files;
+
+  MasterContender* contender;
+  MasterDetector* detector;
+
+  const Option<Authorizer*> authorizer;
+
+  MasterInfo info_;
 
-    // TODO(jmlvanre): Merge other fields as per design doc in
-    // MESOS-703.
+  // Indicates when recovery is complete. Recovery begins once the
+  // master is elected as a leader.
+  Option<process::Future<Nothing>> recovered;
 
-    if (source.user() != info.user()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
-                   << "' for framework " << id() << ". Check MESOS-703";
-    }
+  struct Slaves
+  {
+    Slaves() : removed(MAX_REMOVED_SLAVES) {}
 
-    info.set_name(source.name());
+    // Imposes a time limit for slaves that we recover from the
+    // registry to re-register with the master.
+    Option<process::Timer> recoveredTimer;
 
-    if (source.has_failover_timeout()) {
-      info.set_failover_timeout(source.failover_timeout());
-    } else {
-      info.clear_failover_timeout();
-    }
+    // Slaves that have been recovered from the registrar but have yet
+    // to re-register. We keep a "reregistrationTimer" above to ensure
+    // we remove these slaves if they do not re-register.
+    hashset<SlaveID> recovered;
 
-    if (source.checkpoint() != info.checkpoint()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
-                   << stringify(info.checkpoint()) << "' for framework " << id()
-                   << ". Check MESOS-703";
-    }
+    // Slaves that are in the process of registering.
+    hashset<process::UPID> registering;
 
-    if (source.role() != info.role()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
-                   << "' for framework " << id() << ". Check MESOS-703";
-    }
+    // Only those slaves that are re-registering for the first time
+    // with this master. We must not answer questions related to
+    // these slaves until the registrar determines their fate.
+    hashset<SlaveID> reregistering;
 
-    if (source.has_hostname()) {
-      info.set_hostname(source.hostname());
-    } else {
-      info.clear_hostname();
-    }
+    hashmap<SlaveID, Slave*> registered;
 
-    if (source.principal() != info.principal()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
-                   << info.principal() << "' for framework " << id()
-                   << ". Check MESOS-703";
-    }
+    // Slaves that are in the process of being removed from the
+    // registrar. Think of these as being partially removed: we must
+    // not answer questions related to these until they are removed
+    // from the registry.
+    hashset<SlaveID> removing;
 
-    if (source.has_webui_url()) {
-      info.set_webui_url(source.webui_url());
-    } else {
-      info.clear_webui_url();
-    }
-  }
+    // We track removed slaves to preserve the consistency
+    // semantics of the pre-registrar code when a non-strict registrar
+    // is being used. That is, if we remove a slave, we must make
+    // an effort to prevent it from (re-)registering, sending updates,
+    // etc. We keep a cache here to prevent this from growing in an
+    // unbounded manner.
+    // TODO(bmahler): Ideally we could use a cache with set semantics.
+    Cache<SlaveID, Nothing> removed;
 
-  FrameworkInfo info;
+    // This rate limiter is used to limit the removal of slaves failing
+    // health checks.
+    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+    // a wrapper around libprocess process which is thread safe.
+    Option<std::shared_ptr<process::RateLimiter>> limiter;
 
-  process::UPID pid;
+    bool transitioning(const Option<SlaveID>& slaveId)
+    {
+      if (slaveId.isSome()) {
+        return recovered.contains(slaveId.get()) ||
+               reregistering.contains(slaveId.get()) ||
+               removing.contains(slaveId.get());
+      } else {
+        return !recovered.empty() ||
+               !reregistering.empty() ||
+               !removing.empty();
+      }
+    }
+  } slaves;
 
-  // Framework becomes disconnected when the socket closes.
-  bool connected;
+  struct Frameworks
+  {
+    Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
 
-  // Framework becomes deactivated when it is disconnected or
-  // the master receives a DeactivateFrameworkMessage.
-  // No offers will be made to a deactivated framework.
-  bool active;
+    hashmap<FrameworkID, Framework*> registered;
+    boost::circular_buffer<std::shared_ptr<Framework>> completed;
 
-  process::Time registeredTime;
-  process::Time reregisteredTime;
-  process::Time unregisteredTime;
+    // Principals of frameworks keyed by PID.
+    // NOTE: Multiple PIDs can map to the same principal. The
+    // principal is None when the framework doesn't specify it.
+    // The differences between this map and 'authenticated' are:
+    // 1) This map only includes *registered* frameworks. The mapping
+    //    is added when a framework (re-)registers.
+    // 2) This map includes unauthenticated frameworks (when Master
+    //    allows them) if they have principals specified in
+    //    FrameworkInfo.
+    hashmap<process::UPID, Option<std::string>> principals;
 
-  // Tasks that have not yet been launched because they are currently
-  // being authorized.
-  hashmap<TaskID, TaskInfo> pendingTasks;
+    // BoundedRateLimiters keyed by the framework principal.
+    // Like Metrics::Frameworks, all frameworks of the same principal
+    // are throttled together at a common rate limit.
+    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
 
-  hashmap<TaskID, Task*> tasks;
+    // The default limiter is for frameworks not specified in
+    // 'flags.rate_limits'.
+    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
+  } frameworks;
 
-  // NOTE: We use a shared pointer for Task because clang doesn't like
-  // Boost's implementation of circular_buffer with Task (Boost
-  // attempts to do some memset's which are unsafe).
-  boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
+  hashmap<OfferID, Offer*> offers;
+  hashmap<OfferID, process::Timer> offerTimers;
 
-  hashset<Offer*> offers; // Active offers for framework.
+  hashmap<std::string, Role*> roles;
 
-  hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
+  // Authenticator names as supplied via flags.
+  std::vector<std::string> authenticatorNames;
 
-  // NOTE: For the used and offered resources below, we keep the
-  // total as well as partitioned by SlaveID.
-  // We expose the total resources via the HTTP endpoint, and we
-  // keep a running total of the resources because looping over the
-  // slaves to sum the resources has led to perf issues (MESOS-1862).
-  // We keep the resources partitioned by SlaveID because non-scalar
-  // resources can be lost when summing them up across multiple
-  // slaves (MESOS-2373).
-  //
-  // Also note that keeping the totals is safe even though it yields
-  // incorrect results for non-scalar resources.
-  //   (1) For overlapping set items / ranges across slaves, these
-  //       will get added N times but only represented once.
-  //   (2) When an initial subtraction occurs (N-1), the resource is
-  //       no longer represented. (This is the source of the bug).
-  //   (3) When any further subtractions occur (N-(1+M)), the
-  //       Resources simply ignores the subtraction since there's
-  //       nothing to remove, so this is safe for now.
+  Option<Authenticator*> authenticator;
 
-  // TODO(mpark): Strip the non-scalar resources out of the totals
-  // in order to avoid reporting incorrect statistics (MESOS-2623).
+  // Frameworks/slaves that are currently in the process of authentication.
+  // 'authenticating' future is completed when authenticator
+  // completes authentication.
+  // The future is removed from the map when master completes authentication.
+  hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
 
-  // Active task / executor resources.
-  Resources totalUsedResources;
-  hashmap<SlaveID, Resources> usedResources;
+  // Principals of authenticated frameworks/slaves keyed by PID.
+  hashmap<process::UPID, std::string> authenticated;
 
-  // Offered resources.
-  Resources totalOfferedResources;
-  hashmap<SlaveID, Resources> offeredResources;
+  int64_t nextFrameworkId; // Used to give each framework a unique ID.
+  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
+  int64_t nextSlaveId;     // Used to give each slave a unique ID.
 
-private:
-  Framework(const Framework&);              // No copying.
-  Framework& operator = (const Framework&); // No assigning.
-};
+  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+  // thread safe.
+  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
+  // copyable metric types only.
+  std::shared_ptr<Metrics> metrics;
 
+  // Gauge handlers.
+  double _uptime_secs()
+  {
+    return (process::Clock::now() - startTime).secs();
+  }
 
-inline std::ostream& operator << (
-    std::ostream& stream,
-    const Framework& framework)
-{
-  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
-  // updated on framework failover (MESOS-1784).
-  return stream << framework.id() << " (" << framework.info.name()
-                << ") at " << framework.pid;
-}
+  double _elected()
+  {
+    return elected() ? 1 : 0;
+  }
 
+  double _slaves_connected();
+  double _slaves_disconnected();
+  double _slaves_active();
+  double _slaves_inactive();
 
-// Information about an active role.
-struct Role
-{
-  explicit Role(const mesos::master::RoleInfo& _info)
-    : info(_info) {}
+  double _frameworks_connected();
+  double _frameworks_disconnected();
+  double _frameworks_active();
+  double _frameworks_inactive();
 
-  void addFramework(Framework* framework)
+  double _outstanding_offers()
   {
-    frameworks[framework->id()] = framework;
+    return offers.size();
   }
 
-  void removeFramework(Framework* framework)
+  double _event_queue_messages()
   {
-    frameworks.erase(framework->id());
+    return static_cast<double>(eventCount<process::MessageEvent>());
   }
 
-  Resources resources() const
+  double _event_queue_dispatches()
   {
-    Resources resources;
-    foreachvalue (Framework* framework, frameworks) {
-      resources += framework->totalUsedResources;
-      resources += framework->totalOfferedResources;
-    }
+    return static_cast<double>(eventCount<process::DispatchEvent>());
+  }
 
-    return resources;
+  double _event_queue_http_requests()
+  {
+    return static_cast<double>(eventCount<process::HttpEvent>());
   }
 
-  mesos::master::RoleInfo info;
+  double _tasks_staging();
+  double _tasks_starting();
+  double _tasks_running();
 
-  hashmap<FrameworkID, Framework*> frameworks;
+  double _resources_total(const std::string& name);
+  double _resources_used(const std::string& name);
+  double _resources_percent(const std::string& name);
+
+  process::Time startTime; // Start time used to calculate uptime.
+
+  Option<process::Time> electedTime; // Time when this master is elected.
+
+  // Validates the framework including authorization.
+  // Returns None if the framework is valid.
+  // Returns Error if the framework is invalid.
+  // Returns Failure if authorization returns 'Failure'.
+  process::Future<Option<Error>> validate(
+      const FrameworkInfo& frameworkInfo,
+      const process::UPID& from);
 };
 
 


[3/3] mesos git commit: Removed Master::getSlave.

Posted by bm...@apache.org.
Removed Master::getSlave.

Review: https://reviews.apache.org/r/34389


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c24268f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c24268f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c24268f1

Branch: refs/heads/master
Commit: c24268f13a2307ef12bfea794a48786338b422fe
Parents: 42cf03a
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:41:38 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:31 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp     | 43 +++++++++++++++++++++---------------------
 src/master/master.hpp     |  6 ++----
 src/master/validation.cpp |  2 +-
 3 files changed, 24 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d2df99c..1526f59 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2403,7 +2403,8 @@ void Master::accept(
   }
 
   CHECK_SOME(slaveId);
-  Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
+  Slave* slave = slaves.registered.get(slaveId.get());
+  CHECK_NOTNULL(slave);
 
   LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
             << " on slave " << *slave << " for framework " << *framework;
@@ -2477,7 +2478,7 @@ void Master::_accept(
     return;
   }
 
-  Slave* slave = getSlave(slaveId);
+  Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == NULL || !slave->connected) {
     foreach (const Offer::Operation& operation, accept.operations()) {
@@ -2870,7 +2871,7 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
     return;
   }
 
-  Slave* slave = getSlave(task->slave_id());
+  Slave* slave = slaves.registered.get(task->slave_id());
   CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
 
   // We add the task to 'killedTasks' here because the slave
@@ -2934,7 +2935,7 @@ void Master::statusUpdateAcknowledgement(
     return;
   }
 
-  Slave* slave = getSlave(slaveId);
+  Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == NULL) {
     LOG(WARNING)
@@ -3030,7 +3031,8 @@ void Master::schedulerMessage(
     return;
   }
 
-  Slave* slave = getSlave(slaveId);
+  Slave* slave = slaves.registered.get(slaveId);
+
   if (slave == NULL) {
     LOG(WARNING) << "Cannot send framework message for framework "
                  << *framework << " to slave " << slaveId
@@ -3256,7 +3258,7 @@ void Master::reregisterSlave(
     return;
   }
 
-  Slave* slave = getSlave(slaveInfo.id());
+  Slave* slave = slaves.registered.get(slaveInfo.id());
 
   if (slave != NULL) {
     slave->reregisteredTime = Clock::now();
@@ -3435,7 +3437,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 
   LOG(INFO) << "Asked to unregister slave " << slaveId;
 
-  Slave* slave = getSlave(slaveId);
+  Slave* slave = slaves.registered.get(slaveId);
 
   if (slave != NULL) {
     if (slave->pid != from) {
@@ -3473,7 +3475,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
     return;
   }
 
-  Slave* slave = getSlave(update.slave_id());
+  Slave* slave = slaves.registered.get(update.slave_id());
 
   if (slave == NULL) {
     LOG(WARNING) << "Ignoring status update " << update
@@ -4457,7 +4459,8 @@ void Master::removeFramework(Framework* framework)
 
   // Remove pointers to the framework's tasks in slaves.
   foreachvalue (Task* task, utils::copy(framework->tasks)) {
-    Slave* slave = getSlave(task->slave_id());
+    Slave* slave = slaves.registered.get(task->slave_id());
+
     // Since we only find out about tasks when the slave re-registers,
     // it must be the case that the slave exists!
     CHECK(slave != NULL)
@@ -4501,7 +4504,8 @@ void Master::removeFramework(Framework* framework)
 
   // Remove the framework's executors for correct resource accounting.
   foreachkey (const SlaveID& slaveId, utils::copy(framework->executors)) {
-    Slave* slave = getSlave(slaveId);
+    Slave* slave = slaves.registered.get(slaveId);
+
     if (slave != NULL) {
       foreachkey (const ExecutorID& executorId,
                   utils::copy(framework->executors[slaveId])) {
@@ -4896,7 +4900,9 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
         None());
 
     // The slave owns the Task object and cannot be NULL.
-    Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+    Slave* slave = slaves.registered.get(task->slave_id());
+    CHECK_NOTNULL(slave);
+
     slave->taskTerminated(task);
 
     Framework* framework = getFramework(task->framework_id());
@@ -4928,7 +4934,8 @@ void Master::removeTask(Task* task)
   CHECK_NOTNULL(task);
 
   // The slave owns the Task object and cannot be NULL.
-  Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+  Slave* slave = slaves.registered.get(task->slave_id());
+  CHECK_NOTNULL(slave);
 
   if (!protobuf::isTerminalState(task->state())) {
     LOG(WARNING) << "Removing task " << task->task_id()
@@ -5040,7 +5047,8 @@ void Master::removeOffer(Offer* offer, bool rescind)
   framework->removeOffer(offer);
 
   // Remove from slave.
-  Slave* slave = getSlave(offer->slave_id());
+  Slave* slave = slaves.registered.get(offer->slave_id());
+
   CHECK(slave != NULL)
     << "Unknown slave " << offer->slave_id()
     << " in the offer " << offer->id();
@@ -5076,15 +5084,6 @@ Framework* Master::getFramework(const FrameworkID& frameworkId)
 
 
 // TODO(bmahler): Consider killing this.
-Slave* Master::getSlave(const SlaveID& slaveId)
-{
-  return slaves.registered.contains(slaveId)
-    ? slaves.registered[slaveId]
-    : NULL;
-}
-
-
-// TODO(bmahler): Consider killing this.
 Offer* Master::getOffer(const OfferID& offerId)
 {
   return offers.contains(offerId) ? offers[offerId] : NULL;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4a94e23..c8c6251 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -975,7 +975,6 @@ protected:
   void removeOffer(Offer* offer, bool rescind = false);
 
   Framework* getFramework(const FrameworkID& frameworkId);
-  Slave* getSlave(const SlaveID& slaveId);
   Offer* getOffer(const OfferID& offerId);
 
   FrameworkID newFrameworkId();
@@ -1103,9 +1102,8 @@ private:
 
   friend struct Metrics;
 
-  // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
-  // make the following functions friends so that validation functions
-  // can get Offer* and Slave*.
+  // NOTE: Since 'getOffer' and 'slaves' are protected,
+  // we need to make the following functions friends.
   friend Offer* validation::offer::getOffer(
       Master* master, const OfferID& offerId);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 20a6ac8..1793b0e 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -447,7 +447,7 @@ Offer* getOffer(Master* master, const OfferID& offerId)
 Slave* getSlave(Master* master, const SlaveID& slaveId)
 {
   CHECK_NOTNULL(master);
-  return master->getSlave(slaveId);
+  return master->slaves.registered.get(slaveId);
 }
 
 


Re: [1/3] mesos git commit: Index slaves by UPID in the master.

Posted by James Peach <jo...@gmail.com>.
> On May 19, 2015, at 12:22 PM, bmahler@apache.org wrote:
> 
> Repository: mesos
> Updated Branches:
>  refs/heads/master 26091f461 -> c24268f13
> 
> 
> Index slaves by UPID in the master.
> 
> Review: https://reviews.apache.org/r/34388
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af
> 
> Branch: refs/heads/master
> Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf
> Parents: b19ffd2
> Author: Benjamin Mahler <be...@gmail.com>
> Authored: Mon May 18 18:37:11 2015 -0700
> Committer: Benjamin Mahler <be...@gmail.com>
> Committed: Tue May 19 11:55:30 2015 -0700
> 
> ----------------------------------------------------------------------
> src/master/master.cpp | 127 +++++++++++++++++++++++----------------------
> src/master/master.hpp |  65 ++++++++++++++++++++++-
> 2 files changed, 129 insertions(+), 63 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.cpp b/src/master/master.cpp
> index eaea79d..d2df99c 100644
> --- a/src/master/master.cpp
> +++ b/src/master/master.cpp
> @@ -973,7 +973,9 @@ void Master::exited(const UPID& pid)
>     }
>   }
> 
> -  // The semantics when a slave gets disconnected are as follows:
> +  // The semantics when a registered slave gets disconnected are as
> +  // follows:
> +  //
>   // 1) If the slave is not checkpointing, the slave is immediately
>   //    removed and all tasks running on it are transitioned to LOST.
>   //    No resources are recovered, because the slave is removed.
> @@ -985,42 +987,42 @@ void Master::exited(const UPID& pid)
>   //    2.2) Framework is not-checkpointing: The slave is not removed
>   //         but the framework is removed from the slave's structs,
>   //         its tasks transitioned to LOST and resources recovered.
> -  foreachvalue (Slave* slave, slaves.registered) {
> -    if (slave->pid == pid) {
> -      LOG(INFO) << "Slave " << *slave << " disconnected";
> -
> -      if (!slave->info.checkpoint()) {
> -        // Remove the slave, if it is not checkpointing.
> -        LOG(INFO) << "Removing disconnected slave " << *slave
> -                  << " because it is not checkpointing!";
> -        removeSlave(slave,
> -                    "slave is non-checkpointing and disconnected");
> -        return;
> -      } else if (slave->connected) {
> -        // Checkpointing slaves can just be disconnected.
> -        disconnect(slave);
> +  if (slaves.registered.contains(pid)) {
> +    Slave* slave = slaves.registered.get(pid);
> +    CHECK_NOTNULL(slave);

Would it be better to do this:
	if (Slave* slave = slaves.registered.get(pid)) {

J