You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/29 01:40:11 UTC

git commit: Updated the slave to send terminal tasks for terminated executors during re-registration.

Repository: mesos
Updated Branches:
  refs/heads/master 4ef409dee -> 31e382e95


Updated the slave to send terminal tasks for terminated executors
during re-registration.

The slave must send terminal unacknowledged tasks for terminal
executors so that the master can correctly reconcile tasks when
the slave re-registers.

Otherwise, the master may see a missing task
(terminal, unacknowledged) on the slave and incorrectly inform
the framework via TASK_LOST, per MESOS-1388.

Review: https://reviews.apache.org/r/21991


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31e382e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31e382e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31e382e9

Branch: refs/heads/master
Commit: 31e382e95faf1ee1ebb8901bb94ba2b5cefa8607
Parents: 4ef409d
Author: Ben Mahler <be...@gmail.com>
Authored: Wed May 28 15:51:42 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed May 28 15:52:35 2014 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                 | 27 ++++++++++----------
 src/tests/fault_tolerance_tests.cpp | 44 +++++++++++++++++++++-----------
 2 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/31e382e9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7fbedb1..c5c0513 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -803,8 +803,7 @@ void Slave::doReliableRegistration(const Duration& duration)
   CHECK(state == DISCONNECTED || state == TERMINATING) << state;
 
   if (info.id() == "") {
-    // Slave started before master.
-    // (Vinod): Is the above comment true?
+    // Registering for the first time.
     RegisterSlaveMessage message;
     message.mutable_slave()->CopyFrom(info);
     send(master.get(), message);
@@ -816,13 +815,9 @@ void Slave::doReliableRegistration(const Duration& duration)
 
     foreachvalue (Framework* framework, frameworks) {
       foreachvalue (Executor* executor, framework->executors) {
-        // Ignore terminated executors because they do not consume
-        // any resources.
-        if (executor->state == Executor::TERMINATED) {
-          continue;
-        }
-
         // Add launched, terminated, and queued tasks.
+        // Note that terminated executors will only have terminated
+        // unacknowledged tasks.
         foreach (Task* task, executor->launchedTasks.values()) {
           message.add_tasks()->CopyFrom(*task);
         }
@@ -844,12 +839,16 @@ void Slave::doReliableRegistration(const Duration& duration)
             message.mutable_tasks(i)->clear_executor_id();
           }
         } else {
-          ExecutorInfo* executorInfo = message.add_executor_infos();
-          executorInfo->MergeFrom(executor->info);
-
-          // Scheduler Driver will ensure the framework id is set in
-          // ExecutorInfo, effectively making it a required field.
-          CHECK(executorInfo->has_framework_id());
+          // Ignore terminated executors because they do not consume
+          // any resources.
+          if (executor->state != Executor::TERMINATED) {
+            ExecutorInfo* executorInfo = message.add_executor_infos();
+            executorInfo->MergeFrom(executor->info);
+
+            // Scheduler Driver will ensure the framework id is set in
+            // ExecutorInfo, effectively making it a required field.
+            CHECK(executorInfo->has_framework_id());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/31e382e9/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 3f26393..e484a8a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1850,11 +1850,11 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
 }
 
 
-// This test verifies that a re-registering slave does not inform
-// the master about a terminated executor (and its tasks), when the
-// executor has pending updates. We check this by ensuring that the
-// master sends a TASK_LOST update for the task belonging to the
-// terminated executor.
+// This test verifies that a re-registering slave sends the terminal
+// unacknowledged tasks for a terminal executor. This is required
+// for the master to correctly reconcile it's view with the slave's
+// view of tasks. This test drops a terminal update to the master
+// and then forces the slave to re-register.
 TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
 {
   Try<PID<Master> > master = StartMaster();
@@ -1880,7 +1880,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
     .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  EXPECT_CALL(exec, registered(_, _, _, _));
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
 
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
@@ -1894,29 +1896,41 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
-  // Drop the TASK_LOST status update(s) sent to the master.
-  // This ensures that the TASK_LOST received by the scheduler
-  // is generated by the master.
-  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+  // Drop the TASK_FINISHED status update sent to the master.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
 
   Future<ExitedExecutorMessage> executorExitedMessage =
     FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
 
+  Clock::pause();
+
+  TaskStatus finishedStatus;
+  finishedStatus = status.get();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure the update was sent.
+  AWAIT_READY(statusUpdateMessage);
+
   // Now kill the executor.
   containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
 
-  AWAIT_READY(executorExitedMessage);
-
-  // Simulate a spurious master change event (e.g., due to ZooKeeper
-  // expiration) at the slave to force re-registration.
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status2));
 
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
+
   detector.appoint(master.get());
 
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
   AWAIT_READY(status2);
-  EXPECT_EQ(TASK_LOST, status2.get().state());
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
 
   driver.stop();
   driver.join();