You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/30 03:31:54 UTC
svn commit: r1477444 - in /incubator/mesos/trunk/src: master/master.cpp
master/master.hpp tests/fault_tolerance_tests.cpp
Author: bmahler
Date: Tue Apr 30 01:31:53 2013
New Revision: 1477444
URL: http://svn.apache.org/r1477444
Log:
Fixed the master to shutdown deactivated slaves that send further
messages to the master.
Review: https://reviews.apache.org/r/10734
Modified:
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Tue Apr 30 01:31:53 2013
@@ -346,13 +346,6 @@ void Master::initialize()
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
- install<ExecutorToFrameworkMessage>(
- &Master::executorMessage,
- &ExecutorToFrameworkMessage::slave_id,
- &ExecutorToFrameworkMessage::framework_id,
- &ExecutorToFrameworkMessage::executor_id,
- &ExecutorToFrameworkMessage::data);
-
install<ExitedExecutorMessage>(
&Master::exitedExecutor,
&ExitedExecutorMessage::slave_id,
@@ -1049,92 +1042,68 @@ void Master::statusUpdate(const StatusUp
{
const TaskStatus& status = update.status();
- LOG(INFO) << "Status update from " << from
+ // NOTE: We cannot use 'from' here to identify the slave as this is
+ // now sent by the StatusUpdateManagerProcess. Only 'pid' can
+ // be used to identify the slave.
+ LOG(INFO) << "Status update from " << pid
<< ": task " << status.task_id()
<< " of framework " << update.framework_id()
<< " is now in state " << status.state();
Slave* slave = getSlave(update.slave_id());
- if (slave != NULL) {
- Framework* framework = getFramework(update.framework_id());
- if (framework != NULL) {
- // Pass on the (transformed) status update to the framework.
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(pid);
- send(framework->pid, message);
+ if (slave == NULL) {
+ if (deactivatedSlaves.contains(pid)) {
+ // If the slave is deactivated, we have already informed
+ // frameworks that its tasks were LOST, so the slave should
+ // shut down.
+ LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+ << " with id " << update.slave_id() << " ; asking slave "
+ << " to shutdown";
+ send(pid, ShutdownMessage());
+ } else {
+ LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+ << " with id " << update.slave_id();
+ }
+ stats.invalidStatusUpdates++;
+ return;
+ }
- // Lookup the task and see if we need to update anything locally.
- Task* task = slave->getTask(update.framework_id(), status.task_id());
- if (task != NULL) {
- task->set_state(status.state());
-
- // Handle the task appropriately if it's terminated.
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
+ CHECK(!deactivatedSlaves.contains(pid));
- removeTask(task);
- }
+ Framework* framework = getFramework(update.framework_id());
+ if (framework == NULL) {
+ LOG(WARNING) << "Ignoring status update from " << pid << " ("
+ << slave->info.hostname() << "): error, couldn't lookup "
+ << "framework " << update.framework_id();
+ stats.invalidStatusUpdates++;
+ return;
+ }
- stats.tasks[status.state()]++;
+ // Pass on the (transformed) status update to the framework.
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(pid);
+ send(framework->pid, message);
- stats.validStatusUpdates++;
- } else {
- LOG(WARNING) << "Status update from " << from << " ("
- << slave->info.hostname() << "): error, couldn't lookup "
- << "task " << status.task_id();
- stats.invalidStatusUpdates++;
- }
- } else {
- LOG(WARNING) << "Status update from " << from << " ("
- << slave->info.hostname() << "): error, couldn't lookup "
- << "framework " << update.framework_id();
- stats.invalidStatusUpdates++;
- }
- } else {
- LOG(WARNING) << "Status update from " << from
- << ": error, couldn't lookup slave "
- << update.slave_id();
+ // Lookup the task and see if we need to update anything locally.
+ Task* task = slave->getTask(update.framework_id(), status.task_id());
+ if (task == NULL) {
+ LOG(WARNING) << "Status update from " << pid << " ("
+ << slave->info.hostname() << "): error, couldn't lookup "
+ << "task " << status.task_id();
stats.invalidStatusUpdates++;
+ return;
}
-}
+ task->set_state(status.state());
-void Master::executorMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const string& data)
-{
- Slave* slave = getSlave(slaveId);
- if (slave != NULL) {
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Sending framework message from slave " << slaveId << " ("
- << slave->info.hostname() << ")"
- << " to framework " << frameworkId;
- ExecutorToFrameworkMessage message;
- message.mutable_slave_id()->MergeFrom(slaveId);
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_data(data);
- send(framework->pid, message);
-
- stats.validFrameworkMessages++;
- } else {
- LOG(WARNING) << "Cannot send framework message from slave "
- << slaveId << " (" << slave->info.hostname()
- << ") to framework " << frameworkId
- << " because framework does not exist";
- stats.invalidFrameworkMessages++;
- }
- } else {
- LOG(WARNING) << "Cannot send framework message from slave "
- << slaveId << " to framework " << frameworkId
- << " because slave does not exist";
- stats.invalidFrameworkMessages++;
+ // Handle the task appropriately if it's terminated.
+ if (protobuf::isTerminalState(status.state())) {
+ removeTask(task);
}
+
+ stats.tasks[status.state()]++;
+ stats.validStatusUpdates++;
}
@@ -1146,41 +1115,55 @@ void Master::exitedExecutor(const SlaveI
// Only update master's internal data structures here for properly accounting.
// The TASK_LOST updates are handled by the slave.
Slave* slave = getSlave(slaveId);
- if (slave != NULL) {
- // Tell the allocator about the recovered resources.
- if (slave->hasExecutor(frameworkId, executorId)) {
- ExecutorInfo executor = slave->executors[frameworkId][executorId];
-
- LOG(INFO) << "Executor " << executorId
- << " of framework " << frameworkId
- << " on slave " << slaveId
- << " (" << slave->info.hostname() << ")"
- << " exited with status " << status;
-
- allocator->resourcesRecovered(frameworkId,
- slaveId,
- Resources(executor.resources()));
-
- // Remove executor from slave and framework.
- slave->removeExecutor(frameworkId, executorId);
+ if (slave == NULL) {
+ if (deactivatedSlaves.contains(from)) {
+ // If the slave is deactivated, we have already informed
+ // frameworks that its tasks were LOST, so the slave should
+ // shut down.
+ LOG(WARNING) << "Ignoring exited executor '" << executorId
+ << "' of framework " << frameworkId
+ << " on deactivated slave " << slaveId
+ << " ; asking slave to shutdown";
+ reply(ShutdownMessage());
} else {
- LOG(WARNING) << "Ignoring unknown exited executor "
- << executorId << " on slave " << slaveId
- << " (" << slave->info.hostname() << ")";
+ LOG(WARNING) << "Ignoring exited executor '" << executorId
+ << "' of framework " << frameworkId
+ << " on unknown slave " << slaveId;
}
+ return;
+ }
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- framework->removeExecutor(slave->id, executorId);
+ CHECK(!deactivatedSlaves.contains(from));
- // TODO(benh): Send the framework it's executor's exit status?
- // Or maybe at least have something like
- // Scheduler::executorLost?
- }
- } else {
- LOG(INFO) << "Ignoring unknown exited executor " << executorId
+ // Tell the allocator about the recovered resources.
+ if (slave->hasExecutor(frameworkId, executorId)) {
+ ExecutorInfo executor = slave->executors[frameworkId][executorId];
+
+ LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
- << " on unknown slave " << slaveId;
+ << " on slave " << slaveId
+ << " (" << slave->info.hostname() << ")"
+ << " exited with status " << status;
+
+ allocator->resourcesRecovered(frameworkId,
+ slaveId,
+ Resources(executor.resources()));
+
+ // Remove executor from slave and framework.
+ slave->removeExecutor(frameworkId, executorId);
+ } else {
+ LOG(WARNING) << "Ignoring unknown exited executor "
+ << executorId << " on slave " << slaveId
+ << " (" << slave->info.hostname() << ")";
+ }
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ framework->removeExecutor(slave->id, executorId);
+
+ // TODO(benh): Send the framework its executor's exit status?
+ // Or maybe at least have something like
+ // Scheduler::executorLost?
}
}
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Tue Apr 30 01:31:53 2013
@@ -107,10 +107,6 @@ public:
const std::vector<Task>& tasks);
void unregisterSlave(const SlaveID& slaveId);
void statusUpdate(const StatusUpdate& update, const UPID& pid);
- void executorMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const std::string& data);
void exitedExecutor(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Tue Apr 30 01:31:53 2013
@@ -25,13 +25,17 @@
#include <vector>
#include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
#include <mesos/scheduler.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
+#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <stout/stringify.hpp>
+
#include "common/protobuf_utils.hpp"
#include "detector/detector.hpp"
@@ -42,6 +46,7 @@
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
+#include "slave/isolator.hpp"
#include "slave/slave.hpp"
#include "tests/utils.hpp"
@@ -55,6 +60,7 @@ using mesos::internal::master::Allocator
using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
+using mesos::internal::slave::Isolator;
using mesos::internal::slave::Slave;
using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL;
@@ -62,6 +68,7 @@ using process::Clock;
using process::Future;
using process::Message;
using process::PID;
+using process::UPID;
using std::string;
using std::map;
@@ -220,10 +227,6 @@ TEST_F(FaultToleranceClusterTest, Partit
Try<PID<Master> > master = cluster.masters.start();
ASSERT_SOME(master);
- MockExecutor exec;
- Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
- ASSERT_SOME(slave);
-
// Allow the master to PING the slave, but drop all PONG messages
// from the slave. Note that we don't match on the master / slave
// PIDs because it's actually the SlaveObserver Process that sends
@@ -231,7 +234,9 @@ TEST_F(FaultToleranceClusterTest, Partit
Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
DROP_MESSAGES(Eq("PONG"), _, _);
- BasicMasterDetector detector(master.get(), slave.get(), true);
+ MockExecutor exec;
+ Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+ ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
@@ -287,7 +292,7 @@ TEST_F(FaultToleranceClusterTest, Partit
// Drop the first shutdown message from the master (simulated
// partition), allow the second shutdown message to pass when
- // the slave re-registers,
+ // the slave re-registers.
Future<ShutdownMessage> shutdownSlave =
DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
@@ -312,6 +317,7 @@ TEST_F(FaultToleranceClusterTest, Partit
}
ping = FUTURE_MESSAGE(Eq("PING"), _, _);
Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
}
Clock::advance(master::SLAVE_PING_TIMEOUT);
@@ -357,6 +363,259 @@ TEST_F(FaultToleranceClusterTest, Partit
}
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send status updates, we send
+// a ShutdownMessage to the slave. Why? Because during a network
+// partition, the master will remove a partitioned slave, thus sending
+// its tasks to LOST. At this point, when the partition is removed,
+// the slave may attempt to send updates if it was unaware that the
+// master deactivated it. We've already notified frameworks that these
+// tasks were LOST, so we have to have the slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveStatusUpdates)
+{
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the SlaveObserver Process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ MockExecutor exec;
+ Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+ SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(Return());
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ // Drop the first shutdown message from the master (simulated
+ // partition), allow the second shutdown message to pass when
+ // the slave sends an update.
+ Future<ShutdownMessage> shutdownSlave =
+ DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .WillRepeatedly(Return());
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ Clock::pause();
+
+ // Now, induce a partition of the slave by having the master
+ // timeout the slave.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // Wait for the master to attempt to shut down the slave.
+ AWAIT_READY(shutdownSlave);
+
+ // The master will notify the framework that the slave was lost.
+ AWAIT_READY(slaveLost);
+
+ shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ // At this point, the slave still thinks it's registered, so we
+ // simulate a status update coming from the slave.
+ StatusUpdateMessage statusUpdate;
+ statusUpdate.set_pid(stringify(slave.get()));
+ statusUpdate.mutable_update()->mutable_framework_id()->set_value(
+ frameworkId.get().value());
+ statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
+ statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
+ statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
+ "task_id");
+ statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
+ statusUpdate.mutable_update()->set_timestamp(Clock::now());
+ statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
+ process::post(master.get(), statusUpdate);
+
+ // The master should shutdown the slave upon receiving the update.
+ AWAIT_READY(shutdownSlave);
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ cluster.shutdown();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send exited executor messages,
+// we send a ShutdownMessage to the slave. Why? Because during a
+// network partition, the master will remove a partitioned slave, thus
+// sending its tasks to LOST. At this point, when the partition is
+// removed, the slave may attempt to send exited executor messages if
+// it was unaware that the master deactivated it. We've already
+// notified frameworks that the tasks under the executors were LOST,
+// so we have to have the slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveExitedExecutor)
+{
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the SlaveObserver Process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ MockExecutor exec;
+ TestingIsolator* isolator = new TestingIsolator(DEFAULT_EXECUTOR_ID, &exec);
+ process::spawn(isolator);
+ Try<PID<Slave> > slave = cluster.slaves.start(isolator);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));\
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ // Launch a task. This allows us to have the slave send an
+ // ExitedExecutorMessage.
+ TaskID taskId;
+ taskId.set_value("1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->MergeFrom(taskId);
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+ task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Set up the expectations for launching the task.
+ EXPECT_CALL(exec, registered(_, _, _, _));
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Drop all the status updates from the slave, so that we can
+ // ensure the ExitedExecutorMessage is what triggers the slave
+ // shutdown.
+ DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Drop the first shutdown message from the master (simulated
+ // partition) and allow the second shutdown message to pass when
+ // triggered by the ExitedExecutorMessage.
+ Future<ShutdownMessage> shutdownSlave =
+ DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ Future<TaskStatus> lostStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&lostStatus));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ Clock::pause();
+
+ // Now, induce a partition of the slave by having the master
+ // timeout the slave.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // The master will have notified the framework of the lost task.
+ AWAIT_READY(lostStatus);
+ EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+ // Wait for the master to attempt to shut down the slave.
+ AWAIT_READY(shutdownSlave);
+
+ // The master will notify the framework that the slave was lost.
+ AWAIT_READY(slaveLost);
+
+ shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ // Induce an ExitedExecutorMessage from the slave.
+ dispatch(isolator,
+ &Isolator::killExecutor,
+ frameworkId.get(),
+ DEFAULT_EXECUTOR_INFO.executor_id());
+
+ // Upon receiving the message, the master will shutdown the slave.
+ AWAIT_READY(shutdownSlave);
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ cluster.shutdown();
+
+ // TODO(benh): Terminate and wait for the isolator once the slave
+ // is no longer doing so.
+ // process::terminate(isolator);
+ // process::wait(isolator);
+ delete isolator;
+}
+
+
TEST_F(FaultToleranceTest, SchedulerFailover)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);