You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/07/01 21:13:22 UTC

git commit: Fixed a bug in master to properly remove a non checkpointing framework from a checkpointing slave.

Updated Branches:
  refs/heads/master ddbb090e7 -> 777b745d1


Fixed a bug in master to properly remove a non checkpointing
framework from a checkpointing slave.

Review: https://reviews.apache.org/r/12182


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/777b745d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/777b745d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/777b745d

Branch: refs/heads/master
Commit: 777b745d1e6745997aa6aa086a6126f9aec0be43
Parents: ddbb090
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 28 14:40:49 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Jul 1 12:13:10 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp              | 56 ++++++++++++++++++---------------
 src/tests/slave_recovery_tests.cpp | 42 ++++++++++++++++++-------
 2 files changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/777b745d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4da8773..695fb93 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -470,19 +470,25 @@ void Master::exited(const UPID& pid)
       } else {
         // If a slave is checkpointing, remove all non-checkpointing
         // frameworks from the slave.
-        hashset<FrameworkID> ids;
-        foreachvalue (Task* task, utils::copy(slave->tasks)) {
-          if (!ids.contains(task->framework_id())) {
-            ids.insert(task->framework_id());
-            Framework* framework = getFramework(task->framework_id());
-            if (framework != NULL && !framework->info.checkpoint()) {
-              LOG(INFO) << "Removing non-checkpointing framework "
-                        << task->framework_id()
-                        << " from disconnected slave " << slave->id
-                        << "(" << slave->info.hostname() << ")";
-
-              removeFramework(slave, framework);
-            }
+
+        // First, collect all the frameworks running on this slave.
+        hashset<FrameworkID> frameworkIds;
+        foreachvalue (Task* task, slave->tasks) {
+          frameworkIds.insert(task->framework_id());
+        }
+        foreachkey (const FrameworkID& frameworkId, slave->executors) {
+          frameworkIds.insert(frameworkId);
+        }
+
+        // Now, remove all the non-checkpointing frameworks.
+        foreach (const FrameworkID& frameworkId, frameworkIds) {
+          Framework* framework = getFramework(frameworkId);
+          if (framework != NULL && !framework->info.checkpoint()) {
+            LOG(INFO) << "Removing non-checkpointing framework " << frameworkId
+                      << " from disconnected slave " << slave->id
+                      << "(" << slave->info.hostname() << ")";
+
+            removeFramework(slave, framework);
           }
         }
       }
@@ -1792,20 +1798,20 @@ void Master::removeFramework(Slave* slave, Framework* framework)
       // over and the framework hasn't reconnected yet. For more info
       // please see the comments in 'removeFramework(Framework*)'.
       StatusUpdateMessage message;
-      StatusUpdate* update = message.mutable_update();
-      update->mutable_framework_id()->MergeFrom(task->framework_id());
+      message.mutable_update()->CopyFrom(
+          protobuf::createStatusUpdate(
+              task->framework_id(),
+              task->slave_id(),
+              task->task_id(),
+              TASK_LOST,
+              "Slave " + slave->info.hostname() + " disconnected",
+              (task->has_executor_id() ?
+                  Option<ExecutorID>(task->executor_id()) : None())));
 
-      if (task->has_executor_id()) {
-        update->mutable_executor_id()->MergeFrom(task->executor_id());
-      }
+      LOG(INFO) << "Sending status update " << message.update()
+                << " due to disconnected slave " << slave->id
+                << " (" << slave->info.hostname() << ")";
 
-      update->mutable_slave_id()->MergeFrom(task->slave_id());
-      TaskStatus* status = update->mutable_status();
-      status->mutable_task_id()->MergeFrom(task->task_id());
-      status->set_state(TASK_LOST);
-      status->set_message("Slave " + slave->info.hostname() + " disconnected");
-      update->set_timestamp(Clock::now().secs());
-      update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
 
       // Remove the task from slave and framework.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/777b745d/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0195e68..14df749 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -798,28 +798,48 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
-  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  // Launch 2 tasks from this offer.
   vector<TaskInfo> tasks;
-  tasks.push_back(task); // Long-running task
+  Offer offer = offers.get()[0];
 
-  Future<Nothing> update;
+  Offer offer1 = offer;
+  offer1.mutable_resources()->CopyFrom(Resources::parse("cpus:1;mem:512"));
+  tasks.push_back(createTask(offer1, "sleep 1000")); // Long-running task
+
+  Offer offer2 = offer;
+  offer2.mutable_resources()->CopyFrom(Resources::parse("cpus:1;mem:512"));
+  tasks.push_back(createTask(offer2, "sleep 1000")); // Long-running task
+
+  ASSERT_LE(Resources(offer1.resources()) + Resources(offer2.resources()),
+            Resources(offer.resources()));
+
+  Future<Nothing> update1;
+  Future<Nothing> update2;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureSatisfy(&update));
+    .WillOnce(FutureSatisfy(&update1))
+    .WillOnce(FutureSatisfy(&update2));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
-  // Wait for TASK_RUNNING update.
-  AWAIT_READY(update);
+  // Wait for TASK_RUNNING updates from the tasks.
+  AWAIT_READY(update1);
+  AWAIT_READY(update2);
 
-  Future<TaskStatus> status;
+  // The master should generate TASK_LOST updates once the slave is stopped.
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
 
   this->Stop(slave.get());
 
-  // Scheduler should receive the TASK_LOST update.
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_LOST, status.get().state());
+  // Scheduler should receive the TASK_LOST updates.
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_LOST, status1.get().state());
+
+  AWAIT_READY(status2);
+  ASSERT_EQ(TASK_LOST, status2.get().state());
 
   driver.stop();
   driver.join();