You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/19 21:07:25 UTC

[1/2] mesos git commit: Formatting cleanup in the Slave.

Repository: mesos
Updated Branches:
  refs/heads/master c2a112f36 -> efeb11837


Formatting cleanup in the Slave.

This patch includes fixes for incorrect formatting as well as
acceptable-but-can-be-improved formatting.

Review: https://reviews.apache.org/r/35635


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ccf6c254
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ccf6c254
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ccf6c254

Branch: refs/heads/master
Commit: ccf6c254a1620e512ec66f1e20644d47c12c6832
Parents: c2a112f
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:27 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:28 2015 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 209 +++++++++++++++++++----------------------------
 1 file changed, 82 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ccf6c254/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a5ad29f..6c539b5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -326,16 +326,15 @@ void Slave::initialize()
             << "' for --gc_disk_headroom. Must be between 0.0 and 1.0.";
   }
 
-  Try<Nothing> initialize = resourceEstimator->initialize(
-      defer(self(), &Self::usage));
+  Try<Nothing> initialize =
+    resourceEstimator->initialize(defer(self(), &Self::usage));
 
   if (initialize.isError()) {
     EXIT(1) << "Failed to initialize the resource estimator: "
             << initialize.error();
   }
 
-  initialize = qosController->initialize(
-      defer(self(), &Self::usage));
+  initialize = qosController->initialize(defer(self(), &Self::usage));
 
   if (initialize.isError()) {
     EXIT(1) << "Failed to initialize the QoS Controller: "
@@ -512,8 +511,8 @@ void Slave::initialize()
                    lambda::_1,
                    flags.external_log_file.get()));
   } else if (flags.log_dir.isSome()) {
-    Try<string> log = logging::getLogFile(
-        logging::getLogSeverity(flags.logging_level));
+    Try<string> log =
+      logging::getLogFile(logging::getLogSeverity(flags.logging_level));
 
     if (log.isError()) {
       LOG(ERROR) << "Slave log file cannot be found: " << log.error();
@@ -763,10 +762,7 @@ void Slave::authenticate()
     authenticatee->authenticate(master.get(), self(), credential.get())
       .onAny(defer(self(), &Self::_authenticate));
 
-  delay(Seconds(5),
-        self(),
-        &Self::authenticationTimeout,
-        authenticating.get());
+  delay(Seconds(5), self(), &Self::authenticationTimeout, authenticating.get());
 }
 
 
@@ -843,7 +839,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
 
   CHECK_SOME(master);
 
-  switch(state) {
+  switch (state) {
     case DISCONNECTED: {
       LOG(INFO) << "Registered with master " << master.get()
                 << "; given slave ID " << slaveId;
@@ -852,8 +848,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       // in "fetcher.hpp"".
       Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
       if (recovered.isError()) {
-          LOG(FATAL) << "Could not initialize fetcher cache: "
-                     << recovered.error();
+        LOG(FATAL) << "Could not initialize fetcher cache: "
+                   << recovered.error();
       }
 
       state = RUNNING;
@@ -876,11 +872,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       // in case we never receive an initial ping.
       Clock::cancel(pingTimer);
 
-      pingTimer = delay(
-          MASTER_PING_TIMEOUT(),
-          self(),
-          &Slave::pingTimeout,
-          detection);
+      pingTimer =
+        delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
 
       break;
     }
@@ -935,7 +928,7 @@ void Slave::reregistered(
             << "(expected: " << info.id() << "). Committing suicide";
   }
 
-  switch(state) {
+  switch (state) {
     case DISCONNECTED:
       LOG(INFO) << "Re-registered with master " << master.get();
       state = RUNNING;
@@ -1266,11 +1259,12 @@ void Slave::runTask(
     // executors to this framework and remove it from that list.
     // TODO(brenden): Consider using stout/cache.hpp instead of boost
     // circular_buffer.
-    for (boost::circular_buffer<Owned<Framework>>::iterator i =
-        completedFrameworks.begin(); i != completedFrameworks.end(); ++i) {
-      if ((*i)->id() == frameworkId) {
-        framework->completedExecutors = (*i)->completedExecutors;
-        completedFrameworks.erase(i);
+    for (auto it = completedFrameworks.begin(), end = completedFrameworks.end();
+         it != end;
+         ++it) {
+      if ((*it)->id() == frameworkId) {
+        framework->completedExecutors = (*it)->completedExecutors;
+        completedFrameworks.erase(it);
         break;
       }
     }
@@ -1298,8 +1292,7 @@ void Slave::runTask(
     }
 
     // Unschedule executor meta directory.
-    path = paths::getExecutorPath(
-        metaDir, info.id(), frameworkId, executorId);
+    path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
 
     if (os::exists(path)) {
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
@@ -1308,12 +1301,7 @@ void Slave::runTask(
 
   // Run the task after the unschedules are done.
   unschedule.onAny(
-      defer(self(),
-            &Self::_runTask,
-            lambda::_1,
-            frameworkInfo,
-            pid,
-            task));
+      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
 }
 
 
@@ -1330,10 +1318,10 @@ void Slave::_runTask(
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-     LOG(WARNING) << "Ignoring run task " << task.task_id()
-                  << " because the framework " << frameworkId
-                  << " does not exist";
-     return;
+    LOG(WARNING) << "Ignoring run task " << task.task_id()
+                 << " because the framework " << frameworkId
+                 << " does not exist";
+    return;
   }
 
   const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task);
@@ -1343,7 +1331,7 @@ void Slave::_runTask(
       framework->pending[executorId].contains(task.task_id())) {
     framework->pending[executorId].erase(task.task_id());
     if (framework->pending[executorId].empty()) {
-        framework->pending.erase(executorId);
+      framework->pending.erase(executorId);
     }
   } else {
     LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1504,14 +1492,13 @@ void Slave::_runTask(
       }
 
       containerizer->update(executor->containerId, resources)
-        .onAny(defer(
-            self(),
-            &Self::runTasks,
-            lambda::_1,
-            frameworkId,
-            executorId,
-            executor->containerId,
-            list<TaskInfo>({task})));
+        .onAny(defer(self(),
+                     &Self::runTasks,
+                     lambda::_1,
+                     frameworkId,
+                     executorId,
+                     executor->containerId,
+                     list<TaskInfo>({task})));
       break;
     }
     default:
@@ -1705,10 +1692,10 @@ void Slave::killTask(
 
       framework->pending[executorId].erase(taskId);
       if (framework->pending[executorId].empty()) {
-          framework->pending.erase(executorId);
-          if (framework->pending.empty() && framework->executors.empty()) {
-            removeFramework(framework);
-          }
+        framework->pending.erase(executorId);
+        if (framework->pending.empty() && framework->executors.empty()) {
+          removeFramework(framework);
+        }
       }
       return;
     }
@@ -1716,7 +1703,7 @@ void Slave::killTask(
 
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
-      LOG(WARNING) << "Cannot kill task " << taskId
+    LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
                  << " because no corresponding executor is running";
     // We send a TASK_LOST update because this task has never
@@ -2188,7 +2175,7 @@ void Slave::_statusUpdateAcknowledgement(
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
     LOG(ERROR) << "Status update acknowledgement (UUID: " << uuid
-              << ") for task " << taskId
+               << ") for task " << taskId
                << " of unknown executor";
     return;
   }
@@ -2336,14 +2323,13 @@ void Slave::registerExecutor(
       }
 
       containerizer->update(executor->containerId, resources)
-        .onAny(defer(
-            self(),
-            &Self::runTasks,
-            lambda::_1,
-            frameworkId,
-            executorId,
-            executor->containerId,
-            executor->queuedTasks.values()));
+        .onAny(defer(self(),
+                     &Self::runTasks,
+                     lambda::_1,
+                     frameworkId,
+                     executorId,
+                     executor->containerId,
+                     executor->queuedTasks.values()));
       break;
     }
     default:
@@ -2433,13 +2419,12 @@ void Slave::reregisterExecutor(
 
       // Tell the containerizer to update the resources.
       containerizer->update(executor->containerId, executor->resources)
-        .onAny(defer(
-            self(),
-            &Self::_reregisterExecutor,
-            lambda::_1,
-            frameworkId,
-            executorId,
-            executor->containerId));
+        .onAny(defer(self(),
+                     &Self::_reregisterExecutor,
+                     lambda::_1,
+                     frameworkId,
+                     executorId,
+                     executor->containerId));
 
       hashmap<TaskID, TaskInfo> unackedTasks;
       foreach (const TaskInfo& task, tasks) {
@@ -2461,9 +2446,9 @@ void Slave::reregisterExecutor(
       foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
-          LOG(INFO)
-            << "Transitioning STAGED task " << task->task_id() << " to LOST"
-            << " because it is unknown to the executor " << executorId;
+          LOG(INFO) << "Transitioning STAGED task " << task->task_id()
+                    << " to LOST because it is unknown to the executor "
+                    << executorId;
 
           const StatusUpdate& update = protobuf::createStatusUpdate(
               frameworkId,
@@ -2715,24 +2700,12 @@ void Slave::_statusUpdate(
 
   if (checkpoint) {
     // Ask the status update manager to checkpoint and reliably send the update.
-    statusUpdateManager->update(
-        update,
-        info.id(),
-        executorId,
-        containerId)
-      .onAny(defer(self(),
-                  &Slave::__statusUpdate,
-                  lambda::_1,
-                  update,
-                  pid));
+    statusUpdateManager->update(update, info.id(), executorId, containerId)
+      .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
   } else {
     // Ask the status update manager to just retry the update.
     statusUpdateManager->update(update, info.id())
-      .onAny(defer(self(),
-                  &Slave::__statusUpdate,
-                  lambda::_1,
-                  update,
-                  pid));
+      .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
   }
 }
 
@@ -2920,11 +2893,8 @@ void Slave::pingOld(const UPID& from, const string& body)
   // when this occurs.
   Clock::cancel(pingTimer);
 
-  pingTimer = delay(
-      MASTER_PING_TIMEOUT(),
-      self(),
-      &Slave::pingTimeout,
-      detection);
+  pingTimer =
+    delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
 
   send(from, "PONG");
 }
@@ -2952,11 +2922,8 @@ void Slave::ping(const UPID& from, bool connected)
   // when this occurs.
   Clock::cancel(pingTimer);
 
-  pingTimer = delay(
-      MASTER_PING_TIMEOUT(),
-      self(),
-      &Slave::pingTimeout,
-      detection);
+  pingTimer =
+    delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection);
 
   send(from, PongSlaveMessage());
 }
@@ -3065,21 +3032,21 @@ ExecutorInfo Slave::getExecutorInfo(
         task.command().uris());
 
     if (task.command().has_environment()) {
-        executor.mutable_command()->mutable_environment()->MergeFrom(
-            task.command().environment());
+      executor.mutable_command()->mutable_environment()->MergeFrom(
+          task.command().environment());
     }
 
     if (task.command().has_container()) {
-        executor.mutable_command()->mutable_container()->MergeFrom(
-            task.command().container());
+      executor.mutable_command()->mutable_container()->MergeFrom(
+          task.command().container());
     }
 
     if (task.command().has_user()) {
-        executor.mutable_command()->set_user(task.command().user());
+      executor.mutable_command()->set_user(task.command().user());
     }
 
-    Result<string> path = os::realpath(
-        path::join(flags.launcher_dir, "mesos-executor"));
+    Result<string> path =
+      os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
 
     // Explicitly set 'shell' to true since we want to use the shell
     // for running the mesos-executor (and even though this is the
@@ -3091,9 +3058,7 @@ ExecutorInfo Slave::getExecutorInfo(
     } else {
       executor.mutable_command()->set_value(
           "echo '" +
-          (path.isError()
-           ? path.error()
-           : "No such file or directory") +
+          (path.isError() ? path.error() : "No such file or directory") +
           "'; exit 1");
     }
 
@@ -3122,8 +3087,7 @@ ExecutorInfo Slave::getExecutorInfo(
 
   // Add in any default ContainerInfo.
   if (!executor.has_container() && flags.default_container_info.isSome()) {
-    executor.mutable_container()->CopyFrom(
-        flags.default_container_info.get());
+    executor.mutable_container()->CopyFrom(flags.default_container_info.get());
   }
 
   return executor;
@@ -4141,9 +4105,7 @@ void Slave::qosCorrections()
 void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
 {
   // Make sure correction handler is scheduled again.
-  delay(flags.qos_correction_interval_min,
-        self(),
-        &Self::qosCorrections);
+  delay(flags.qos_correction_interval_min, self(), &Self::qosCorrections);
 
   // Verify slave state.
   CHECK(state == RECOVERING || state == DISCONNECTED ||
@@ -4275,8 +4237,7 @@ Future<ResourceUsage> Slave::usage()
   }
 
   return await(futures).then(
-      [usage](const list<Future<ResourceStatistics>>& futures)
-        -> Future<ResourceUsage> {
+      [usage](const list<Future<ResourceStatistics>>& futures) {
         // NOTE: We add ResourceUsage::Executor to 'usage' the same
         // order as we push future to 'futures'. So the variables
         // 'future' and 'executor' below should be in sync.
@@ -4298,7 +4259,7 @@ Future<ResourceUsage> Slave::usage()
           }
         }
 
-        return *usage;
+        return Future<ResourceUsage>(*usage);
       });
 }
 
@@ -4442,8 +4403,8 @@ void Slave::sendExecutorTerminatedStatusUpdate(
       taskId,
       taskState,
       TaskStatus::SOURCE_SLAVE,
-      termination.isReady() ? termination.get().message() :
-                              "Abnormal executor termination",
+      termination.isReady() ? termination.get().message()
+                            : "Abnormal executor termination",
       reason,
       executor->id),
       UPID());
@@ -4635,10 +4596,7 @@ Executor* Framework::launchExecutor(
             << " in work directory '" << directory << "'";
 
   slave->files->attach(executor->directory, executor->directory)
-    .onAny(defer(slave,
-                 &Slave::fileAttached,
-                 lambda::_1,
-                 executor->directory));
+    .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
 
   // Tell the containerizer to launch the executor.
   // NOTE: We modify the ExecutorInfo to include the task's
@@ -4686,11 +4644,11 @@ Executor* Framework::launchExecutor(
   }
 
   launch.onAny(defer(slave,
-               &Slave::executorLaunched,
-               id(),
-               executor->id,
-               containerId,
-               lambda::_1));
+                     &Slave::executorLaunched,
+                     id(),
+                     executor->id,
+                     containerId,
+                     lambda::_1));
 
   // Make sure the executor registers within the given timeout.
   delay(slave->flags.executor_registration_timeout,
@@ -4818,10 +4776,7 @@ void Framework::recoverExecutor(const ExecutorState& state)
 
   // Expose the executor's files.
   slave->files->attach(executor->directory, executor->directory)
-    .onAny(defer(slave,
-                 &Slave::fileAttached,
-                 lambda::_1,
-                 executor->directory));
+    .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
 
   // Add the executor to the framework.
   executors[executor->id] = executor;


[2/2] mesos git commit: Sent StatusUpdates if checkpointed resources don't exist on the slave.

Posted by be...@apache.org.
Sent StatusUpdates if checkpointed resources don't exist on the slave.

No bug was observed (yet), but realized I forgot about this in the
dynamic reservations patches.

Review: https://reviews.apache.org/r/35433


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/efeb1183
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/efeb1183
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/efeb1183

Branch: refs/heads/master
Commit: efeb1183760e4bd9dd73a2a65af16274673a721f
Parents: ccf6c25
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:48 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto |   1 +
 src/slave/slave.cpp       | 110 ++++++++++++++++++++++++++++++++++-------
 2 files changed, 93 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8df1211..81008ed 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -893,6 +893,7 @@ message TaskStatus {
     REASON_MASTER_DISCONNECTED = 7;
     REASON_MEMORY_LIMIT = 8;
     REASON_RECONCILIATION = 9;
+    REASON_RESOURCES_UNKNOWN = 18;
     REASON_SLAVE_DISCONNECTED = 10;
     REASON_SLAVE_REMOVED = 11;
     REASON_SLAVE_RESTARTED = 12;

http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6c539b5..19b7508 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1332,6 +1332,16 @@ void Slave::_runTask(
     framework->pending[executorId].erase(task.task_id());
     if (framework->pending[executorId].empty()) {
       framework->pending.erase(executorId);
+      // NOTE: Ideally we would perform the following check here:
+      //
+      //   if (framework->executors.empty() &&
+      //       framework->pending.empty()) {
+      //     removeFramework(framework);
+      //   }
+      //
+      // However, we need 'framework' to stay valid for the rest of
+      // this function. As such, we perform the check before each of
+      // the 'return' statements below.
     }
   } else {
     LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1347,9 +1357,12 @@ void Slave::_runTask(
                  << " of framework " << frameworkId
                  << " because the framework is terminating";
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
     }
+
     return;
   }
 
@@ -1373,6 +1386,8 @@ void Slave::_runTask(
     // manager to stop retrying for its un-acked updates.
     statusUpdate(update, UPID());
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
     }
@@ -1380,28 +1395,75 @@ void Slave::_runTask(
     return;
   }
 
-  // NOTE: If the task or executor uses persistent volumes, the slave
-  // should already know about it. In case the slave doesn't know
-  // about them (e.g., CheckpointResourcesMessage was dropped or came
-  // out of order), we simply fail the slave to be safe.
-  Resources volumes = Resources(task.resources()).persistentVolumes();
+  // NOTE: If the task or executor uses resources that are
+  // checkpointed on the slave (e.g. persistent volumes), we should
+  // already know about it. If the slave doesn't know about them (e.g.
+  // CheckpointResourcesMessage was dropped or came out of order),
+  // we send TASK_LOST status updates here since restarting the task
+  // may succeed in the event that CheckpointResourcesMessage arrives
+  // out of order.
+  Resources checkpointedTaskResources =
+    Resources(task.resources()).filter(needCheckpointing);
 
-  foreach (const Resource& volume, volumes) {
-    CHECK(checkpointedResources.contains(volume))
-      << "Unknown persistent volume " << volume
-      << " for task " << task.task_id()
-      << " of framework " << frameworkId;
+  foreach (const Resource& resource, checkpointedTaskResources) {
+    if (!checkpointedResources.contains(resource)) {
+      LOG(WARNING) << "Unknown checkpointed resource " << resource
+                   << " for task " << task.task_id()
+                   << " of framework " << frameworkId;
+
+      const StatusUpdate update = protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          task.task_id(),
+          TASK_LOST,
+          TaskStatus::SOURCE_SLAVE,
+          "The checkpointed resources being used by the task are unknown to "
+          "the slave",
+          TaskStatus::REASON_RESOURCES_UNKNOWN);
+
+      statusUpdate(update, UPID());
+
+      // Refer to the comment after 'framework->pending.erase' above
+      // for why we need this.
+      if (framework->executors.empty() && framework->pending.empty()) {
+        removeFramework(framework);
+      }
+
+      return;
+    }
   }
 
   if (task.has_executor()) {
-    Resources volumes =
-      Resources(task.executor().resources()).persistentVolumes();
-
-    foreach (const Resource& volume, volumes) {
-      CHECK(checkpointedResources.contains(volume))
-        << "Unknown persistent volume " << volume
-        << " for executor " << task.executor().executor_id()
-        << " of framework " << frameworkId;
+    Resources checkpointedExecutorResources =
+      Resources(task.executor().resources()).filter(needCheckpointing);
+
+    foreach (const Resource& resource, checkpointedExecutorResources) {
+      if (!checkpointedResources.contains(resource)) {
+        LOG(WARNING) << "Unknown checkpointed resource " << resource
+                     << " for executor " << task.executor().executor_id()
+                     << " of framework " << frameworkId;
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            task.task_id(),
+            TASK_LOST,
+            TaskStatus::SOURCE_SLAVE,
+            "The checkpointed resources being used by the executor are unknown "
+            "to the slave",
+            TaskStatus::REASON_RESOURCES_UNKNOWN,
+            task.executor().executor_id());
+
+        statusUpdate(update, UPID());
+
+        // Refer to the comment after 'framework->pending.erase' above
+        // for why we need this.
+        if (framework->executors.empty() && framework->pending.empty()) {
+          removeFramework(framework);
+        }
+
+        return;
+      }
     }
   }
 
@@ -1415,6 +1477,12 @@ void Slave::_runTask(
                  << " of framework " << frameworkId
                  << " because the slave is terminating";
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
     // We don't send a TASK_LOST here because the slave is
     // terminating.
     return;
@@ -1507,6 +1575,12 @@ void Slave::_runTask(
                  << " is in unexpected state " << executor->state;
       break;
   }
+
+  // Refer to the comment after 'framework->pending.erase' above
+  // for why we need this.
+  if (framework->executors.empty() && framework->pending.empty()) {
+    removeFramework(framework);
+  }
 }