You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/07 04:58:06 UTC

[5/5] git commit: Fixed master to properly reconcile KillTask messages.

Fixed master to properly reconcile KillTask messages.

Review: https://reviews.apache.org/r/13339


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a47b58e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a47b58e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a47b58e7

Branch: refs/heads/master
Commit: a47b58e72f0ce4eac459ca7960316c1f0b8ad12f
Parents: d88f1e3
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Aug 6 12:56:01 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 6 19:57:27 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp              | 53 +++++++++++++++-----
 src/master/master.hpp              | 13 +++--
 src/tests/slave_recovery_tests.cpp | 86 +++++++++++++++++++++++++++++++++
 3 files changed, 135 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c5f8a93..a2f8929 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -905,17 +905,24 @@ void Master::killTask(const FrameworkID& frameworkId,
       Slave* slave = getSlave(task->slave_id());
       CHECK(slave != NULL);
 
-      // TODO(vinod): Reconcile KillTaskMessages that are not received
-      // by the slave (e.g., disconnected, partitioned).
-      LOG(INFO) << "Telling slave " << slave->id << " ("
-                << slave->info.hostname() << ")"
-                << " to kill task " << taskId
-                << " of framework " << frameworkId;
+      // We add the task to 'killedTasks' here because the slave
+      // might be partitioned or disconnected but the master
+      // doesn't know it yet.
+      slave->killedTasks.put(frameworkId, taskId);
 
-      KillTaskMessage message;
-      message.mutable_framework_id()->MergeFrom(frameworkId);
-      message.mutable_task_id()->MergeFrom(taskId);
-      send(slave->pid, message);
+      // NOTE: This task will be properly reconciled when the
+      // disconnected slave re-registers with the master.
+      if (!slave->disconnected) {
+        LOG(INFO) << "Telling slave " << slave->id << " ("
+                  << slave->info.hostname() << ")"
+                  << " to kill task " << taskId
+                  << " of framework " << frameworkId;
+
+        KillTaskMessage message;
+        message.mutable_framework_id()->MergeFrom(frameworkId);
+        message.mutable_task_id()->MergeFrom(taskId);
+        send(slave->pid, message);
+      }
     } else {
       // TODO(benh): Once the scheduler has persistance and
       // high-availability of it's tasks, it will be the one that
@@ -1093,9 +1100,6 @@ void Master::reregisterSlave(const SlaveID& slaveId,
         allocator->slaveAdded(slaveId, slaveInfo, resources);
       }
 
-      // Reconcile tasks between master and the slave.
-      reconcileTasks(slave, tasks);
-
       SlaveReregisteredMessage message;
       message.mutable_slave_id()->MergeFrom(slave->id);
       reply(message);
@@ -1103,6 +1107,11 @@ void Master::reregisterSlave(const SlaveID& slaveId,
       // Update the slave pid and relink to it.
       slave->pid = from;
       link(slave->pid);
+
+      // Reconcile tasks between master and the slave.
+      // NOTE: This needs to be done after the registration message is
+      // sent to the slave and the new pid is linked.
+      reconcileTasks(slave, tasks);
     } else {
       // NOTE: This handles the case when the slave tries to
       // re-register with a failed over master.
@@ -1800,6 +1809,24 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
       removeTask(task);
     }
   }
+
+  // Send KillTaskMessages for tasks in 'killedTasks' that are
+  // still alive on the slave. This could happen if the slave
+  // did not receive KillTaskMessage because of a partition or
+  // disconnection.
+  foreach (const Task& task, tasks) {
+    if (!protobuf::isTerminalState(task.state()) &&
+        slave->killedTasks.contains(task.framework_id(), task.task_id())) {
+      LOG(WARNING) << " Slave " << slave->id << " (" << slave->info.hostname()
+                   << ") has non-terminal task " << task.task_id()
+                   << " that is supposed to be killed. Killing it now!";
+
+      KillTaskMessage message;
+      message.mutable_framework_id()->MergeFrom(task.framework_id());
+      message.mutable_task_id()->MergeFrom(task.task_id());
+      send(slave->pid, message);
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8824822..fb99603 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -309,7 +309,7 @@ struct Slave
     tasks[key] = task;
     LOG(INFO) << "Adding task " << task->task_id()
               << " with resources " << task->resources()
-              << " on slave " << id;
+              << " on slave " << id << " (" << info.hostname() << ")";
     resourcesInUse += task->resources();
   }
 
@@ -319,9 +319,10 @@ struct Slave
       std::make_pair(task->framework_id(), task->task_id());
     CHECK(tasks.count(key) > 0);
     tasks.erase(key);
+    killedTasks.remove(task->framework_id(), task->task_id());
     LOG(INFO) << "Removing task " << task->task_id()
               << " with resources " << task->resources()
-              << " on slave " << id;
+              << " on slave " << id << " (" << info.hostname() << ")";
     resourcesInUse -= task->resources();
   }
 
@@ -331,7 +332,7 @@ struct Slave
     offers.insert(offer);
     VLOG(1) << "Adding offer " << offer->id()
             << " with resources " << offer->resources()
-            << " on slave " << id;
+            << " on slave " << id << " (" << info.hostname() << ")";
     resourcesOffered += offer->resources();
   }
 
@@ -341,7 +342,7 @@ struct Slave
     offers.erase(offer);
     VLOG(1) << "Removing offer " << offer->id()
             << " with resources " << offer->resources()
-            << " on slave " << id;
+            << " on slave " << id << " (" << info.hostname() << ")";
     resourcesOffered -= offer->resources();
   }
 
@@ -400,6 +401,10 @@ struct Slave
   // We should find a way to eliminate this.
   hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks;
 
+  // Tasks that were asked to kill by frameworks.
+  // This is used for reconciliation when the slave re-registers.
+  multihashmap<FrameworkID, TaskID> killedTasks;
+
   // Active offers on this slave.
   hashset<Offer*> offers;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a47b58e7/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6172cba..bd755f6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1498,3 +1498,89 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
 
   this->Shutdown(); // Shutdown before isolator(s) get deallocated.
 }
+
+
+// This test verifies that a KillTask message received by the
+// master when a checkpointing slave is disconnected is properly
+// reconciled when the slave reregisters.
+TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+      FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+  TypeParam isolator1;
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(registerSlaveMessage);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task
+
+  // Capture the slave and framework ids.
+  SlaveID slaveId = offers.get()[0].slave_id();
+  FrameworkID frameworkId = offers.get()[0].framework_id();
+
+  EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait for TASK_RUNNING update to be acknowledged.
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  this->Stop(slave.get());
+
+  // Now send a KillTask message to the master. This will not be
+  // received by the slave because it is down.
+  driver.killTask(task.task_id());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Now restart the slave (use same flags) with a new isolator.
+  TypeParam isolator2;
+
+  slave = this->StartSlave(&isolator2, flags);
+  ASSERT_SOME(slave);
+
+  // Scheduler should get a TASK_KILLED message.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_KILLED, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}