You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/31 17:39:41 UTC

svn commit: r1404245 - in /incubator/mesos/trunk/src: sched/sched.cpp tests/master_tests.cpp

Author: benh
Date: Wed Oct 31 16:39:40 2012
New Revision: 1404245

URL: http://svn.apache.org/viewvc?rev=1404245&view=rev
Log:
Fixed bug where a status update acknowledgment is sent after the
driver has been aborted.

From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7782

Modified:
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/tests/master_tests.cpp

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1404245&r1=1404244&r2=1404245&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Wed Oct 31 16:39:40 2012
@@ -266,7 +266,7 @@ protected:
       send(master, message);
     }
 
-    delay(Seconds(1.0), self(), &SchedulerProcess::doReliableRegistration);
+    delay(Seconds(1.0), self(), &Self::doReliableRegistration);
   }
 
   void resourceOffers(const vector<Offer>& offers,
@@ -340,19 +340,30 @@ protected:
 
     scheduler->statusUpdate(driver, status);
 
-    // Send a status update acknowledgement ONLY if not aborted!
-    if (!aborted && pid) {
-      // Acknowledge the message (we do this last, after we invoked
-      // the scheduler, if we did at all, in case it causes a crash,
-      // since this way the message might get resent/routed after the
-      // scheduler comes back online).
-      StatusUpdateAcknowledgementMessage message;
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      message.mutable_slave_id()->MergeFrom(update.slave_id());
-      message.mutable_task_id()->MergeFrom(status.task_id());
-      message.set_uuid(update.uuid());
-      send(pid, message);
+    // Acknowledge the status update.
+    // NOTE: We do a dispatch here instead of directly sending the ACK because,
+    // we want to avoid sending the ACK if the driver was aborted when we
+    // made the statusUpdate call. This works because, the 'abort' message will
+    // be enqueued before the ACK message is processed.
+    if (pid) {
+      dispatch(self(), &Self::statusUpdateAcknowledgement, update, pid);
+    }
+  }
+
+  void statusUpdateAcknowledgement(const StatusUpdate& update, const UPID& pid)
+  {
+    if (aborted) {
+      VLOG(1) << "Not sending status update acknowledgment message because "
+              << "the driver is aborted!";
+      return;
     }
+
+    StatusUpdateAcknowledgementMessage message;
+    message.mutable_framework_id()->MergeFrom(framework.id());
+    message.mutable_slave_id()->MergeFrom(update.slave_id());
+    message.mutable_task_id()->MergeFrom(update.status().task_id());
+    message.set_uuid(update.uuid());
+    send(pid, message);
   }
 
   void lostSlave(const SlaveID& slaveId)

Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1404245&r1=1404244&r2=1404245&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Wed Oct 31 16:39:40 2012
@@ -263,6 +263,103 @@ TEST(MasterTest, KillTask)
 }
 
 
+TEST(MasterTest, StatusUpdateAck)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  TestAllocatorProcess a;
+  Files files;
+  Master m(&a, &files);
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  trigger statusUpdateAckMsg;
+  EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
+                    Return(false)));
+
+  trigger shutdownCall;
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(Trigger(&shutdownCall));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule, &files);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+  vector<Offer> offers;
+  TaskStatus status;
+
+  trigger resourceOffersCall, statusUpdateCall;
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status),
+                    Trigger(&statusUpdateCall)));
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOffersCall);
+
+  EXPECT_NE(0u, offers.size());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  driver.launchTasks(offers[0].id(), tasks);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  // Ensure we get a status update ACK.
+  WAIT_UNTIL(statusUpdateAckMsg);
+
+  driver.stop();
+  driver.join();
+
+  WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+
+  process::terminate(slave);
+  process::wait(slave);
+
+  process::terminate(master);
+  process::wait(master);
+}
+
+
 TEST(MasterTest, RecoverResources)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);