You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/02/15 01:11:31 UTC

git commit: Small cleanups of the Master code.

Repository: mesos
Updated Branches:
  refs/heads/master 311743f6b -> 765c938f5


Small cleanups of the Master code.

Review: https://reviews.apache.org/r/15114


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/765c938f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/765c938f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/765c938f

Branch: refs/heads/master
Commit: 765c938f59a680adb3c38c90ad3bd1138c17926c
Parents: 311743f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Oct 30 22:50:38 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Feb 14 15:48:51 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 432 +++++++++++++++++++++++----------------------
 src/master/master.hpp |   7 +-
 2 files changed, 229 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/765c938f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a4e1b1f..f24df23 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -242,79 +242,7 @@ Master::Master(
 }
 
 
-Master::~Master()
-{
-  LOG(INFO) << "Shutting down master";
-
-  // Remove the frameworks.
-  // Note we are not deleting the pointers to the frameworks from the
-  // allocator or the roles because it is unnecessary bookkeeping at
-  // this point since we are shutting down.
-  foreachvalue (Framework* framework, frameworks) {
-    // Remove pointers to the framework's tasks in slaves.
-    foreachvalue (Task* task, utils::copy(framework->tasks)) {
-      Slave* slave = getSlave(task->slave_id());
-      // Since we only find out about tasks when the slave re-registers,
-      // it must be the case that the slave exists!
-      CHECK(slave != NULL)
-        << "Unknown slave " << task->slave_id()
-        << " in the task " << task->task_id();
-
-      removeTask(task);
-    }
-
-    // Remove the framework's offers (if they weren't removed before).
-    foreach (Offer* offer, utils::copy(framework->offers)) {
-      removeOffer(offer);
-    }
-
-    delete framework;
-  }
-  frameworks.clear();
-
-  foreachvalue (Future<Nothing> future, authenticating) {
-    // NOTE: This is necessary during tests because a copy of
-    // this future is used to setup authentication timeout. If a
-    // test doesn't discard this future, authentication timeout might
-    // fire in a different test and any associated callbacks
-    // (e.g., '_authenticate()') would be called. This is because the
-    // master pid doesn't change across the tests.
-    // TODO(vinod): This seems to be a bug in libprocess or the
-    // testing infrastructure.
-    future.discard();
-  }
-
-  CHECK_EQ(offers.size(), 0UL);
-
-  foreachvalue (Slave* slave, slaves) {
-    LOG(INFO) << "Removing slave " << slave->id
-              << " (" << slave->info.hostname() << ")";
-
-    // Remove tasks that are in the slave but not in any framework.
-    // This could happen when the framework has yet to reregister
-    // after master failover.
-    // NOTE: keys() and values() are used because slave->tasks is
-    //       modified by removeTask()!
-    foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
-      foreach (Task* task, slave->tasks[frameworkId].values()) {
-        removeTask(task);
-      }
-    }
-
-    // Kill the slave observer.
-    terminate(slave->observer);
-    wait(slave->observer);
-
-    delete slave->observer;
-    delete slave;
-  }
-  slaves.clear();
-
-  terminate(whitelistWatcher);
-  wait(whitelistWatcher);
-
-  delete whitelistWatcher;
-}
+Master::~Master() {}
 
 
 void Master::initialize()
@@ -585,6 +513,76 @@ void Master::initialize()
 void Master::finalize()
 {
   LOG(INFO) << "Master terminating";
+
+  // Remove the frameworks.
+  // Note we are not deleting the pointers to the frameworks from the
+  // allocator or the roles because it is unnecessary bookkeeping at
+  // this point since we are shutting down.
+  foreachvalue (Framework* framework, frameworks) {
+    // Remove pointers to the framework's tasks in slaves.
+    foreachvalue (Task* task, utils::copy(framework->tasks)) {
+      Slave* slave = getSlave(task->slave_id());
+      // Since we only find out about tasks when the slave re-registers,
+      // it must be the case that the slave exists!
+      CHECK(slave != NULL)
+        << "Unknown slave " << task->slave_id()
+        << " in the task " << task->task_id();
+
+      removeTask(task);
+    }
+
+    // Remove the framework's offers (if they weren't removed before).
+    foreach (Offer* offer, utils::copy(framework->offers)) {
+      removeOffer(offer);
+    }
+
+    delete framework;
+  }
+  frameworks.clear();
+
+  CHECK_EQ(offers.size(), 0UL);
+
+  foreachvalue (Slave* slave, slaves) {
+    // Remove tasks that are in the slave but not in any framework.
+    // This could happen when the framework has yet to re-register
+    // after master failover.
+    // NOTE: keys() and values() are used because slave->tasks is
+    //       modified by removeTask()!
+    foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
+      foreach (Task* task, slave->tasks[frameworkId].values()) {
+        removeTask(task);
+      }
+    }
+
+    // Kill the slave observer.
+    terminate(slave->observer);
+    wait(slave->observer);
+
+    delete slave->observer;
+    delete slave;
+  }
+  slaves.clear();
+
+  foreachvalue (Future<Nothing> future, authenticating) {
+    // NOTE: This is necessary during tests because a copy of
+    // this future is used to setup authentication timeout. If a
+    // test doesn't discard this future, authentication timeout might
+    // fire in a different test and any associated callbacks
+    // (e.g., '_authenticate()') would be called. This is because the
+    // master pid doesn't change across the tests.
+    // TODO(vinod): This seems to be a bug in libprocess or the
+    // testing infrastructure.
+    future.discard();
+  }
+
+  foreachvalue (Role* role, roles) {
+    delete role;
+  }
+  roles.clear();
+
+  terminate(whitelistWatcher);
+  wait(whitelistWatcher);
+  delete whitelistWatcher;
 }
 
 
@@ -1698,30 +1696,11 @@ void Master::killTask(
   }
 
   Task* task = framework->getTask(taskId);
-  if (task != NULL) {
-    Slave* slave = getSlave(task->slave_id());
-    CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
-
-    // We add the task to 'killedTasks' here because the slave
-    // might be partitioned or disconnected but the master
-    // doesn't know it yet.
-    slave->killedTasks.put(frameworkId, taskId);
-
-    // NOTE: This task will be properly reconciled when the
-    // disconnected slave re-registers with the master.
-    if (!slave->disconnected) {
-      LOG(INFO) << "Telling slave " << slave->id << " ("
-                << slave->info.hostname() << ")"
-                << " to kill task " << taskId
-                << " of framework " << frameworkId;
+  if (task == NULL) {
+    // TODO(bmahler): This is incorrect in some cases, see:
+    // https://issues.apache.org/jira/browse/MESOS-783
 
-      KillTaskMessage message;
-      message.mutable_framework_id()->MergeFrom(frameworkId);
-      message.mutable_task_id()->MergeFrom(taskId);
-      send(slave->pid, message);
-    }
-  } else {
-    // TODO(benh): Once the scheduler has persistance and
+    // TODO(benh): Once the scheduler has persistence and
     // high-availability of it's tasks, it will be the one that
     // determines that this invocation of 'killTask' is silly, and
     // can just return "locally" (i.e., after hitting only the other
@@ -1729,7 +1708,8 @@ void Master::killTask(
 
     LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
-                 << " because the task cannot be found";
+                 << " because it cannot be found, sending TASK_LOST";
+
     StatusUpdateMessage message;
     StatusUpdate* update = message.mutable_update();
     update->mutable_framework_id()->MergeFrom(frameworkId);
@@ -1740,6 +1720,29 @@ void Master::killTask(
     update->set_timestamp(Clock::now().secs());
     update->set_uuid(UUID::random().toBytes());
     send(framework->pid, message);
+    return;
+  }
+
+  Slave* slave = getSlave(task->slave_id());
+  CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
+
+  // We add the task to 'killedTasks' here because the slave
+  // might be partitioned or disconnected but the master
+  // doesn't know it yet.
+  slave->killedTasks.put(frameworkId, taskId);
+
+  // NOTE: This task will be properly reconciled when the
+  // disconnected slave re-registers with the master.
+  if (!slave->disconnected) {
+    LOG(INFO) << "Telling slave " << slave->id << " ("
+        << slave->info.hostname() << ")"
+        << " to kill task " << taskId
+        << " of framework " << frameworkId;
+
+    KillTaskMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_task_id()->MergeFrom(taskId);
+    send(slave->pid, message);
   }
 }
 
@@ -1773,33 +1776,35 @@ void Master::schedulerMessage(
   }
 
   Slave* slave = getSlave(slaveId);
-  if (slave != NULL) {
-    if (!slave->disconnected) {
-      LOG(INFO) << "Sending framework message for framework "
-                << frameworkId << " to slave " << slaveId
-                << " (" << slave->info.hostname() << ")";
-
-      FrameworkToExecutorMessage message;
-      message.mutable_slave_id()->MergeFrom(slaveId);
-      message.mutable_framework_id()->MergeFrom(frameworkId);
-      message.mutable_executor_id()->MergeFrom(executorId);
-      message.set_data(data);
-      send(slave->pid, message);
-
-      stats.validFrameworkMessages++;
-    } else {
-      LOG(WARNING) << "Cannot send framework message for framework "
-                   << frameworkId << " to slave " << slaveId
-                   << " (" << slave->info.hostname() << ")"
-                   << " because slave is disconnected";
-      stats.invalidFrameworkMessages++;
-    }
-  } else {
+  if (slave == NULL) {
     LOG(WARNING) << "Cannot send framework message for framework "
                  << frameworkId << " to slave " << slaveId
                  << " because slave does not exist";
     stats.invalidFrameworkMessages++;
+    return;
+  }
+
+  if (slave->disconnected) {
+    LOG(WARNING) << "Cannot send framework message for framework "
+                 << frameworkId << " to slave " << slaveId
+                 << " (" << slave->info.hostname() << ")"
+                 << " because slave is disconnected";
+    stats.invalidFrameworkMessages++;
+    return;
   }
+
+  LOG(INFO) << "Sending framework message for framework "
+            << frameworkId << " to slave " << slaveId
+            << " (" << slave->info.hostname() << ")";
+
+  FrameworkToExecutorMessage message;
+  message.mutable_slave_id()->MergeFrom(slaveId);
+  message.mutable_framework_id()->MergeFrom(frameworkId);
+  message.mutable_executor_id()->MergeFrom(executorId);
+  message.set_data(data);
+  send(slave->pid, message);
+
+  stats.validFrameworkMessages++;
 }
 
 
@@ -1858,93 +1863,97 @@ void Master::reregisterSlave(
   }
 
   if (slaveId == "") {
-    LOG(ERROR) << "Slave " << from << " re-registered without an id!";
+    LOG(ERROR) << "Shutting down slave " << from << " that re-registered "
+               << "without an id!";
     reply(ShutdownMessage());
-  } else if (deactivatedSlaves.contains(from)) {
+    return;
+  }
+
+  if (deactivatedSlaves.contains(from)) {
     // We disallow deactivated slaves from re-registering. This is
     // to ensure that when a master deactivates a slave that was
     // partitioned, we don't allow the slave to re-register, as we've
     // already informed frameworks that the tasks were lost.
-    LOG(ERROR) << "Slave " << slaveId << " at " << from
-               << " attempted to re-register after deactivation";
+    LOG(ERROR) << "Shutting down slave " << slaveId << " at " << from
+               << " that attempted to re-register after deactivation";
     reply(ShutdownMessage());
-  } else {
-    Slave* slave = getSlave(slaveId);
-
-    if (slave != NULL) {
-      slave->reregisteredTime = Clock::now();
-
-      // NOTE: This handles the case where a slave tries to
-      // re-register with an existing master (e.g. because of a
-      // spurious Zookeeper session expiration or after the slave
-      // recovers after a restart).
-      // For now, we assume this slave is not nefarious (eventually
-      // this will be handled by orthogonal security measures like key
-      // based authentication).
-      LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
-                   << ") is being allowed to re-register with an already"
-                   << " in use id (" << slaveId << ")";
-
-      // TODO(bmahler): There's an implicit assumption here that when
-      // the master already knows about this slave, the slave cannot
-      // have tasks unknown to the master. This _should_ be the case
-      // since the causal relationship is:
-      //   slave removes task -> master removes task
-      // We should enforce this via a CHECK (dangerous), or by shutting
-      // down slaves that are found to violate this assumption.
-
-      SlaveReregisteredMessage message;
-      message.mutable_slave_id()->MergeFrom(slave->id);
-      reply(message);
-
-      // Update the slave pid and relink to it.
-      // NOTE: Re-linking the slave here always rather than only when
-      // the slave is disconnected can lead to multiple exited events
-      // in succession for a disconnected slave. As a result, we
-      // ignore duplicate exited events for disconnected checkpointing
-      // slaves.
-      // See: https://issues.apache.org/jira/browse/MESOS-675
-      slave->pid = from;
-      link(slave->pid);
-
-      // Reconcile tasks between master and the slave.
-      // NOTE: This needs to be done after the registration message is
-      // sent to the slave and the new pid is linked.
-      reconcile(slave, executorInfos, tasks);
-
-      // If this is a disconnected slave, add it back to the allocator.
-      // This is done after reconciliation to ensure the allocator's
-      // offers include the recovered resources initially on this
-      // slave.
-      if (slave->disconnected) {
-        slave->disconnected = false; // Reset the flag.
-        allocator->slaveReconnected(slaveId);
-      }
-    } else {
-      // NOTE: This handles the case when the slave tries to
-      // re-register with a failed over master.
-      slave = new Slave(slaveInfo, slaveId, from, Clock::now());
-      slave->reregisteredTime = Clock::now();
+    return;
+  }
 
-      LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
-                << slave->pid << " (" << slave->info.hostname() << ")";
+  Slave* slave = getSlave(slaveId);
+  if (slave != NULL) {
+    slave->reregisteredTime = Clock::now();
+
+    // NOTE: This handles the case where a slave tries to
+    // re-register with an existing master (e.g. because of a
+    // spurious Zookeeper session expiration or after the slave
+    // recovers after a restart).
+    // For now, we assume this slave is not nefarious (eventually
+    // this will be handled by orthogonal security measures like key
+    // based authentication).
+    LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
+                       << ") is being allowed to re-register with an already"
+                       << " in use id (" << slaveId << ")";
+
+    // TODO(bmahler): There's an implicit assumption here that when
+    // the master already knows about this slave, the slave cannot
+    // have tasks unknown to the master. This _should_ be the case
+    // since the causal relationship is:
+    //   slave removes task -> master removes task
+    // We should enforce this via a CHECK (dangerous), or by shutting
+    // down slaves that are found to violate this assumption.
 
-      readdSlave(slave, executorInfos, tasks);
+    SlaveReregisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    reply(message);
+
+    // Update the slave pid and relink to it.
+    // NOTE: Re-linking the slave here always rather than only when
+    // the slave is disconnected can lead to multiple exited events
+    // in succession for a disconnected slave. As a result, we
+    // ignore duplicate exited events for disconnected checkpointing
+    // slaves.
+    // See: https://issues.apache.org/jira/browse/MESOS-675
+    slave->pid = from;
+    link(slave->pid);
+
+    // Reconcile tasks between master and the slave.
+    // NOTE: This needs to be done after the registration message is
+    // sent to the slave and the new pid is linked.
+    reconcile(slave, executorInfos, tasks);
+
+    // If this is a disconnected slave, add it back to the allocator.
+    // This is done after reconciliation to ensure the allocator's
+    // offers include the recovered resources initially on this
+    // slave.
+    if (slave->disconnected) {
+      slave->disconnected = false; // Reset the flag.
+      allocator->slaveReconnected(slaveId);
     }
+  } else {
+    // NOTE: This handles the case when the slave tries to
+    // re-register with a failed over master.
+    slave = new Slave(slaveInfo, slaveId, from, Clock::now());
+    slave->reregisteredTime = Clock::now();
 
-    // Send the latest framework pids to the slave.
-    CHECK_NOTNULL(slave);
-    hashset<UPID> pids;
-    foreach (const Task& task, tasks) {
-      Framework* framework = getFramework(task.framework_id());
-      if (framework != NULL && !pids.contains(framework->pid)) {
-        UpdateFrameworkMessage message;
-        message.mutable_framework_id()->MergeFrom(framework->id);
-        message.set_pid(framework->pid);
-        send(slave->pid, message);
+    LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
+        << slave->pid << " (" << slave->info.hostname() << ")";
 
-        pids.insert(framework->pid);
-      }
+    readdSlave(slave, executorInfos, tasks);
+  }
+
+  // Send the latest framework pids to the slave.
+  CHECK_NOTNULL(slave);
+  hashset<UPID> pids;
+  foreach (const Task& task, tasks) {
+    Framework* framework = getFramework(task.framework_id());
+    if (framework != NULL && !pids.contains(framework->pid)) {
+      UpdateFrameworkMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      message.set_pid(framework->pid);
+      send(slave->pid, message);
+
+      pids.insert(framework->pid);
     }
   }
 }
@@ -1997,22 +2006,15 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
     << " which is deactivated slave " << update.slave_id()
     << "(" << slave->info.hostname() << ")";
 
-  Framework* framework = getFramework(update.framework_id());
-  if (framework == NULL) {
-    LOG(WARNING) << "Ignoring status update " << update
-                 << " from " << pid << " ("
-                 << slave->info.hostname() << "): error, couldn't lookup "
-                 << "framework " << update.framework_id();
+  // Forward the update to the framework.
+  Try<Nothing> _forward = forward(update, pid);
+  if (_forward.isError()) {
+    LOG(WARNING) << "Ignoring status update " << update << " from " << pid
+                 << " (" << slave->info.hostname() << "): " << _forward.error();
     stats.invalidStatusUpdates++;
     return;
   }
 
-  // Pass on the (transformed) status update to the framework.
-  StatusUpdateMessage message;
-  message.mutable_update()->MergeFrom(update);
-  message.set_pid(pid);
-  send(framework->pid, message);
-
   // Lookup the task and see if we need to update anything locally.
   Task* task = slave->getTask(update.framework_id(), status.task_id());
   if (task == NULL) {
@@ -2043,6 +2045,22 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 }
 
 
+Try<Nothing> Master::forward(const StatusUpdate& update, const UPID& pid)
+{
+  Framework* framework = getFramework(update.framework_id());
+  if (framework == NULL) {
+    return Error("Unknown framework " + stringify(update.framework_id()));
+  }
+
+  // Pass on the (transformed) status update to the framework.
+  StatusUpdateMessage message;
+  message.mutable_update()->MergeFrom(update);
+  message.set_pid(pid);
+  send(framework->pid, message);
+  return Nothing();
+}
+
+
 void Master::exitedExecutor(
     const UPID& from,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/765c938f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 737bd8b..00d630a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -254,6 +254,9 @@ protected:
   // Remove a task.
   void removeTask(Task* task);
 
+  // Forwards the update to the framework.
+  Try<Nothing> forward(const StatusUpdate& update, const UPID& pid);
+
   // Remove an offer and optionally rescind the offer as well.
   void removeOffer(Offer* offer, bool rescind = false);
 
@@ -387,7 +390,6 @@ struct Slave
       info(_info),
       pid(_pid),
       registeredTime(time),
-      lastHeartbeat(time),
       disconnected(false),
       observer(NULL) {}
 
@@ -493,7 +495,6 @@ struct Slave
 
   Time registeredTime;
   Option<Time> reregisteredTime;
-  Time lastHeartbeat;
 
   // We mark a slave 'disconnected' when it has checkpointing
   // enabled because we expect it reregister after recovery.
@@ -623,8 +624,8 @@ struct Framework
     }
   }
 
-
   const FrameworkID id; // TODO(benh): Store this in 'info'.
+
   const FrameworkInfo info;
 
   UPID pid;