You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/19 21:07:26 UTC

[2/2] mesos git commit: Sent StatusUpdates if checkpointed resources don't exist on the slave.

Sent StatusUpdates if checkpointed resources don't exist on the slave.

No bug was observed (yet), but realized I forgot about this in the
dynamic reservations patches.

Review: https://reviews.apache.org/r/35433


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/efeb1183
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/efeb1183
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/efeb1183

Branch: refs/heads/master
Commit: efeb1183760e4bd9dd73a2a65af16274673a721f
Parents: ccf6c25
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jun 19 12:03:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jun 19 12:03:48 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto |   1 +
 src/slave/slave.cpp       | 110 ++++++++++++++++++++++++++++++++++-------
 2 files changed, 93 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8df1211..81008ed 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -893,6 +893,7 @@ message TaskStatus {
     REASON_MASTER_DISCONNECTED = 7;
     REASON_MEMORY_LIMIT = 8;
     REASON_RECONCILIATION = 9;
+    REASON_RESOURCES_UNKNOWN = 18;
     REASON_SLAVE_DISCONNECTED = 10;
     REASON_SLAVE_REMOVED = 11;
     REASON_SLAVE_RESTARTED = 12;

http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6c539b5..19b7508 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1332,6 +1332,16 @@ void Slave::_runTask(
     framework->pending[executorId].erase(task.task_id());
     if (framework->pending[executorId].empty()) {
       framework->pending.erase(executorId);
+      // NOTE: Ideally we would perform the following check here:
+      //
+      //   if (framework->executors.empty() &&
+      //       framework->pending.empty()) {
+      //     removeFramework(framework);
+      //   }
+      //
+      // However, we need 'framework' to stay valid for the rest of
+      // this function. As such, we perform the check before each of
+      // the 'return' statements below.
     }
   } else {
     LOG(WARNING) << "Ignoring run task " << task.task_id()
@@ -1347,9 +1357,12 @@ void Slave::_runTask(
                  << " of framework " << frameworkId
                  << " because the framework is terminating";
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
     }
+
     return;
   }
 
@@ -1373,6 +1386,8 @@ void Slave::_runTask(
     // manager to stop retrying for its un-acked updates.
     statusUpdate(update, UPID());
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
     }
@@ -1380,28 +1395,75 @@ void Slave::_runTask(
     return;
   }
 
-  // NOTE: If the task or executor uses persistent volumes, the slave
-  // should already know about it. In case the slave doesn't know
-  // about them (e.g., CheckpointResourcesMessage was dropped or came
-  // out of order), we simply fail the slave to be safe.
-  Resources volumes = Resources(task.resources()).persistentVolumes();
+  // NOTE: If the task or executor uses resources that are
+  // checkpointed on the slave (e.g. persistent volumes), we should
+  // already know about it. If the slave doesn't know about them (e.g.
+  // CheckpointResourcesMessage was dropped or came out of order),
+  // we send TASK_LOST status updates here since restarting the task
+  // may succeed in the event that CheckpointResourcesMessage arrives
+  // out of order.
+  Resources checkpointedTaskResources =
+    Resources(task.resources()).filter(needCheckpointing);
 
-  foreach (const Resource& volume, volumes) {
-    CHECK(checkpointedResources.contains(volume))
-      << "Unknown persistent volume " << volume
-      << " for task " << task.task_id()
-      << " of framework " << frameworkId;
+  foreach (const Resource& resource, checkpointedTaskResources) {
+    if (!checkpointedResources.contains(resource)) {
+      LOG(WARNING) << "Unknown checkpointed resource " << resource
+                   << " for task " << task.task_id()
+                   << " of framework " << frameworkId;
+
+      const StatusUpdate update = protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          task.task_id(),
+          TASK_LOST,
+          TaskStatus::SOURCE_SLAVE,
+          "The checkpointed resources being used by the task are unknown to "
+          "the slave",
+          TaskStatus::REASON_RESOURCES_UNKNOWN);
+
+      statusUpdate(update, UPID());
+
+      // Refer to the comment after 'framework->pending.erase' above
+      // for why we need this.
+      if (framework->executors.empty() && framework->pending.empty()) {
+        removeFramework(framework);
+      }
+
+      return;
+    }
   }
 
   if (task.has_executor()) {
-    Resources volumes =
-      Resources(task.executor().resources()).persistentVolumes();
-
-    foreach (const Resource& volume, volumes) {
-      CHECK(checkpointedResources.contains(volume))
-        << "Unknown persistent volume " << volume
-        << " for executor " << task.executor().executor_id()
-        << " of framework " << frameworkId;
+    Resources checkpointedExecutorResources =
+      Resources(task.executor().resources()).filter(needCheckpointing);
+
+    foreach (const Resource& resource, checkpointedExecutorResources) {
+      if (!checkpointedResources.contains(resource)) {
+        LOG(WARNING) << "Unknown checkpointed resource " << resource
+                     << " for executor " << task.executor().executor_id()
+                     << " of framework " << frameworkId;
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            task.task_id(),
+            TASK_LOST,
+            TaskStatus::SOURCE_SLAVE,
+            "The checkpointed resources being used by the executor are unknown "
+            "to the slave",
+            TaskStatus::REASON_RESOURCES_UNKNOWN,
+            task.executor().executor_id());
+
+        statusUpdate(update, UPID());
+
+        // Refer to the comment after 'framework->pending.erase' above
+        // for why we need this.
+        if (framework->executors.empty() && framework->pending.empty()) {
+          removeFramework(framework);
+        }
+
+        return;
+      }
     }
   }
 
@@ -1415,6 +1477,12 @@ void Slave::_runTask(
                  << " of framework " << frameworkId
                  << " because the slave is terminating";
 
+    // Refer to the comment after 'framework->pending.erase' above
+    // for why we need this.
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
     // We don't send a TASK_LOST here because the slave is
     // terminating.
     return;
@@ -1507,6 +1575,12 @@ void Slave::_runTask(
                  << " is in unexpected state " << executor->state;
       break;
   }
+
+  // Refer to the comment after 'framework->pending.erase' above
+  // for why we need this.
+  if (framework->executors.empty() && framework->pending.empty()) {
+    removeFramework(framework);
+  }
 }