You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/31 17:39:41 UTC
svn commit: r1404245 - in /incubator/mesos/trunk/src: sched/sched.cpp
tests/master_tests.cpp
Author: benh
Date: Wed Oct 31 16:39:40 2012
New Revision: 1404245
URL: http://svn.apache.org/viewvc?rev=1404245&view=rev
Log:
Fixed bug where a status update acknowledgment is sent after the
driver has been aborted.
From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7782
Modified:
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/tests/master_tests.cpp
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1404245&r1=1404244&r2=1404245&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Wed Oct 31 16:39:40 2012
@@ -266,7 +266,7 @@ protected:
send(master, message);
}
- delay(Seconds(1.0), self(), &SchedulerProcess::doReliableRegistration);
+ delay(Seconds(1.0), self(), &Self::doReliableRegistration);
}
void resourceOffers(const vector<Offer>& offers,
@@ -340,19 +340,30 @@ protected:
scheduler->statusUpdate(driver, status);
- // Send a status update acknowledgement ONLY if not aborted!
- if (!aborted && pid) {
- // Acknowledge the message (we do this last, after we invoked
- // the scheduler, if we did at all, in case it causes a crash,
- // since this way the message might get resent/routed after the
- // scheduler comes back online).
- StatusUpdateAcknowledgementMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_slave_id()->MergeFrom(update.slave_id());
- message.mutable_task_id()->MergeFrom(status.task_id());
- message.set_uuid(update.uuid());
- send(pid, message);
+ // Acknowledge the status update.
+ // NOTE: We do a dispatch here instead of directly sending the ACK because,
+ // we want to avoid sending the ACK if the driver was aborted when we
+ // made the statusUpdate call. This works because, the 'abort' message will
+ // be enqueued before the ACK message is processed.
+ if (pid) {
+ dispatch(self(), &Self::statusUpdateAcknowledgement, update, pid);
+ }
+ }
+
+ void statusUpdateAcknowledgement(const StatusUpdate& update, const UPID& pid)
+ {
+ if (aborted) {
+ VLOG(1) << "Not sending status update acknowledgment message because "
+ << "the driver is aborted!";
+ return;
}
+
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->MergeFrom(framework.id());
+ message.mutable_slave_id()->MergeFrom(update.slave_id());
+ message.mutable_task_id()->MergeFrom(update.status().task_id());
+ message.set_uuid(update.uuid());
+ send(pid, message);
}
void lostSlave(const SlaveID& slaveId)
Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1404245&r1=1404244&r2=1404245&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Wed Oct 31 16:39:40 2012
@@ -263,6 +263,103 @@ TEST(MasterTest, KillTask)
}
+TEST(MasterTest, StatusUpdateAck)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ TestAllocatorProcess a;
+ Files files;
+ Master m(&a, &files);
+ PID<Master> master = process::spawn(&m);
+
+ MockExecutor exec;
+
+ trigger statusUpdateAckMsg;
+ EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
+ Return(false)));
+
+ trigger shutdownCall;
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(Trigger(&shutdownCall));
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+
+ Resources resources = Resources::parse("cpus:2;mem:1024");
+
+ Slave s(resources, true, &isolationModule, &files);
+ PID<Slave> slave = process::spawn(&s);
+
+ BasicMasterDetector detector(master, slave, true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+ vector<Offer> offers;
+ TaskStatus status;
+
+ trigger resourceOffersCall, statusUpdateCall;
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers),
+ Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(DoAll(SaveArg<1>(&status),
+ Trigger(&statusUpdateCall)));
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ driver.launchTasks(offers[0].id(), tasks);
+
+ WAIT_UNTIL(statusUpdateCall);
+
+ EXPECT_EQ(TASK_RUNNING, status.state());
+
+ // Ensure we get a status update ACK.
+ WAIT_UNTIL(statusUpdateAckMsg);
+
+ driver.stop();
+ driver.join();
+
+ WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+
+ process::terminate(slave);
+ process::wait(slave);
+
+ process::terminate(master);
+ process::wait(master);
+}
+
+
TEST(MasterTest, RecoverResources)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);