You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:32 UTC

[09/18] git commit: Fixed master to properly handle TASK_LOST updates generated by it.

Fixed master to properly handle TASK_LOST updates generated by it.

Review: https://reviews.apache.org/r/13446


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd3584a7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd3584a7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd3584a7

Branch: refs/heads/master
Commit: fd3584a71f68015fcfff70cc889b32fac28f941b
Parents: ba2ee7c
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 11:08:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 121 ++++++++++++++-----------------
 src/messages/messages.proto         |   2 +
 src/tests/fault_tolerance_tests.cpp |  22 +++---
 3 files changed, 68 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0675b52..6530008 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -875,6 +875,9 @@ void Master::launchTasks(const FrameworkID& frameworkId,
         status->set_message("Task launched with invalid offer");
         update->set_timestamp(Clock::now().secs());
         update->set_uuid(UUID::random().toBytes());
+
+        LOG(INFO) << "Sending status update " << *update
+                  << " for launch task attempt on invalid offer " << offerId;
         send(framework->pid, message);
       }
     }
@@ -1163,10 +1166,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   // NOTE: We cannot use 'from' here to identify the slave as this is
   // now sent by the StatusUpdateManagerProcess. Only 'pid' can
   // be used to identify the slave.
-  LOG(INFO) << "Status update from " << pid
-            << ": task " << status.task_id()
-            << " of framework " << update.framework_id()
-            << " is now in state " << status.state();
+  LOG(INFO) << "Status update " << update << " from " << pid;
 
   Slave* slave = getSlave(update.slave_id());
   if (slave == NULL) {
@@ -1174,12 +1174,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
       // If the slave is deactivated, we have already informed
       // frameworks that its tasks were LOST, so the slave should
       // shut down.
-      LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+      LOG(WARNING) << "Ignoring status update " << update
+                   << " from deactivated slave " << pid
                    << " with id " << update.slave_id() << " ; asking slave "
                    << " to shutdown";
       send(pid, ShutdownMessage());
     } else {
-      LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+      LOG(WARNING) << "Ignoring status update " << update
+                   << " from unknown slave " << pid
                    << " with id " << update.slave_id();
     }
     stats.invalidStatusUpdates++;
@@ -1190,7 +1192,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 
   Framework* framework = getFramework(update.framework_id());
   if (framework == NULL) {
-    LOG(WARNING) << "Ignoring status update from " << pid << " ("
+    LOG(WARNING) << "Ignoring status update " << update
+                 << " from " << pid << " ("
                  << slave->info.hostname() << "): error, couldn't lookup "
                  << "framework " << update.framework_id();
     stats.invalidStatusUpdates++;
@@ -1206,7 +1209,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   // Lookup the task and see if we need to update anything locally.
   Task* task = slave->getTask(update.framework_id(), status.task_id());
   if (task == NULL) {
-    LOG(WARNING) << "Status update from " << pid << " ("
+    LOG(WARNING) << "Status update " << update
+                 << " from " << pid << " ("
                  << slave->info.hostname() << "): error, couldn't lookup "
                  << "task " << status.task_id();
     stats.invalidStatusUpdates++;
@@ -1225,10 +1229,11 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 }
 
 
-void Master::exitedExecutor(const SlaveID& slaveId,
-                            const FrameworkID& frameworkId,
-                            const ExecutorID& executorId,
-                            int32_t status)
+void Master::exitedExecutor(
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    int32_t status)
 {
   // Only update master's internal data structures here for properly accounting.
   // The TASK_LOST updates are handled by the slave.
@@ -1671,6 +1676,7 @@ void Master::processTasks(Offer* offer,
           TASK_LOST,
           error.get());
 
+      LOG(INFO) << "Sending status update " << update << " for invalid task";
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
       send(framework->pid, message);
@@ -1786,20 +1792,14 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
                    << " of framework " << task->framework_id()
                    << " unknown to the slave " << slave->id;
 
-      Framework* framework = getFramework(task->framework_id());
-      if (framework != NULL) {
-        const StatusUpdate& update = protobuf::createStatusUpdate(
-            task->framework_id(),
-            slave->id,
-            task->task_id(),
-            TASK_LOST,
-            "Task was not received by the slave");
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          task->framework_id(),
+          slave->id,
+          task->task_id(),
+          TASK_LOST,
+          "Task is unknown to the slave");
 
-        StatusUpdateMessage message;
-        message.mutable_update()->CopyFrom(update);
-        send(framework->pid, message);
-      }
-      removeTask(task);
+      statusUpdate(update, UPID());
     }
   }
 
@@ -1893,6 +1893,10 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
 
 void Master::removeFramework(Framework* framework)
 {
+  CHECK_NOTNULL(framework);
+
+  LOG(INFO) << "Removing framework " << framework->id;
+
   if (framework->active) {
     // Tell the allocator to stop allocating resources to this framework.
     allocator->frameworkDeactivated(framework->id);
@@ -1961,6 +1965,10 @@ void Master::removeFramework(Slave* slave, Framework* framework)
   CHECK_NOTNULL(slave);
   CHECK_NOTNULL(framework);
 
+  LOG(INFO) << "Removing framework " << framework->id
+            << " from slave " << slave->id
+            << " (" << slave->info.hostname() << ")";
+
   // Remove pointers to framework's tasks in slaves, and send status updates.
   foreachvalue (Task* task, utils::copy(slave->tasks)) {
     // Remove tasks that belong to this framework.
@@ -1968,25 +1976,16 @@ void Master::removeFramework(Slave* slave, Framework* framework)
       // A framework might not actually exist because the master failed
       // over and the framework hasn't reconnected yet. For more info
       // please see the comments in 'removeFramework(Framework*)'.
-      StatusUpdateMessage message;
-      message.mutable_update()->CopyFrom(
-          protobuf::createStatusUpdate(
-              task->framework_id(),
-              task->slave_id(),
-              task->task_id(),
-              TASK_LOST,
-              "Slave " + slave->info.hostname() + " disconnected",
-              (task->has_executor_id() ?
-                  Option<ExecutorID>(task->executor_id()) : None())));
-
-      LOG(INFO) << "Sending status update " << message.update()
-                << " due to disconnected slave " << slave->id
-                << " (" << slave->info.hostname() << ")";
-
-      send(framework->pid, message);
-
-      // Remove the task from slave and framework.
-      removeTask(task);
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+        task->framework_id(),
+        task->slave_id(),
+        task->task_id(),
+        TASK_LOST,
+        "Slave " + slave->info.hostname() + " disconnected",
+        (task->has_executor_id()
+            ? Option<ExecutorID>(task->executor_id()) : None()));
+
+      statusUpdate(update, UPID());
     }
   }
 
@@ -2122,6 +2121,9 @@ void Master::removeSlave(Slave* slave)
 {
   CHECK_NOTNULL(slave);
 
+  LOG(INFO) << "Removing slave " << slave->id
+            << " (" << slave->info.hostname() << ")";
+
   // We do this first, to make sure any of the resources recovered
   // below (e.g., removeTask()) are ignored by the allocator.
   if (!slave->disconnected) {
@@ -2130,8 +2132,6 @@ void Master::removeSlave(Slave* slave)
 
   // Remove pointers to slave's tasks in frameworks, and send status updates
   foreachvalue (Task* task, utils::copy(slave->tasks)) {
-    Framework* framework = getFramework(task->framework_id());
-
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
     // situation for frameworks that want to have high-availability,
@@ -2140,25 +2140,16 @@ void Master::removeSlave(Slave* slave)
     // want to do is create a local Framework object to represent that
     // framework until it fails over. See the TODO above in
     // Master::reregisterSlave.
-    if (framework != NULL) {
-      StatusUpdateMessage message;
-      message.mutable_update()->CopyFrom(
-          protobuf::createStatusUpdate(
-              task->framework_id(),
-              task->slave_id(),
-              task->task_id(),
-              TASK_LOST,
-              "Slave " + slave->info.hostname() + " removed",
-              (task->has_executor_id() ?
-                  Option<ExecutorID>(task->executor_id()) : None())));
-
-      LOG(INFO) << "Sending status update " << message.update()
-                << " due to the removal of slave "
-                << slave->id << " (" << slave->info.hostname() << ")";
-
-      send(framework->pid, message);
-    }
-    removeTask(task);
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        task->framework_id(),
+        task->slave_id(),
+        task->task_id(),
+        TASK_LOST,
+        "Slave " + slave->info.hostname() + " removed",
+        (task->has_executor_id() ?
+            Option<ExecutorID>(task->executor_id()) : None()));
+
+    statusUpdate(update, UPID());
   }
 
   foreach (Offer* offer, utils::copy(slave->offers)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19d4b38..4d400c2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -185,6 +185,8 @@ message KillTaskMessage {
 }
 
 
+// NOTE: If 'pid' is present, scheduler driver sends an
+// acknowledgement to the pid.
 message StatusUpdateMessage {
   required StatusUpdate update = 1;
   optional string pid = 2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 7bfe3b1..d74463f 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -431,18 +431,16 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
 
   // At this point, the slave still thinks it's registered, so we
   // simulate a status update coming from the slave.
-  StatusUpdateMessage statusUpdate;
-  statusUpdate.set_pid(stringify(slave.get()));
-  statusUpdate.mutable_update()->mutable_framework_id()->set_value(
-      frameworkId.get().value());
-  statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
-  statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
-  statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
-      "task_id");
-  statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
-  statusUpdate.mutable_update()->set_timestamp(Clock::now().secs());
-  statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
-  process::post(master.get(), statusUpdate);
+  TaskID taskId;
+  taskId.set_value("task_id");
+  const StatusUpdate& update = createStatusUpdate(
+      frameworkId.get(), slaveId, taskId, TASK_RUNNING);
+
+  StatusUpdateMessage message;
+  message.mutable_update()->CopyFrom(update);
+  message.set_pid(stringify(slave.get()));
+
+  process::post(master.get(), message);
 
   // The master should shutdown the slave upon receiving the update.
   AWAIT_READY(shutdownMessage);