You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/30 03:31:27 UTC
svn commit: r1477442 - in /incubator/mesos/trunk/src: detector/detector.cpp
master/http.cpp master/master.cpp master/master.hpp
tests/fault_tolerance_tests.cpp tests/master_detector_tests.cpp
tests/zookeeper_tests.cpp
Author: bmahler
Date: Tue Apr 30 01:31:27 2013
New Revision: 1477442
URL: http://svn.apache.org/r1477442
Log:
Send NoMasterDetectedMessage on session timeout to non-contending
detectors. Added a disconnected slave map to the master to track
disconnected slaves, in order to disallow slave re-registration after
a network partition.
Review: https://reviews.apache.org/r/10172
Modified:
incubator/mesos/trunk/src/detector/detector.cpp
incubator/mesos/trunk/src/master/http.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
incubator/mesos/trunk/src/tests/master_detector_tests.cpp
incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Tue Apr 30 01:31:27 2013
@@ -388,22 +388,11 @@ void ZooKeeperMasterDetectorProcess::tim
timer = None();
expire = true;
- // We only send a NoMasterDetectedMessage if we are a
- // contending detector. This is because:
- // If we are a non-contending detector (e.g. slave), a zk session
- // expiration doesn't necessarily mean a new leader (master) is elected
- // (e.g. the slave is partitioned from the zk server). If the leading
- // master stays the same (i.e., no leader election), then the
- // slave should still accept a ShutDownMessage from the master.
- // If a new master does get elected, the slave would know about it
- // because it would do a leader detection after it connects/re-connects.
- if (contend) {
- // TODO(bmahler): We always want to clear the sequence number
- // prior to sending NoMasterDetectedMessage. It might be prudent
- // to use a helper function to enforce this.
- currentMasterSeq = ""; // Clear the master sequence number.
- process::post(pid, NoMasterDetectedMessage());
- }
+ // TODO(bmahler): We always want to clear the sequence number
+ // prior to sending NoMasterDetectedMessage. It might be prudent
+ // to use a helper function to enforce this.
+ currentMasterSeq = ""; // Clear the master sequence number.
+ process::post(pid, NoMasterDetectedMessage());
}
}
Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Tue Apr 30 01:31:27 2013
@@ -265,7 +265,8 @@ Future<Response> stats(
object.values["elected"] = master.elected; // Note: using int not bool.
object.values["total_schedulers"] = master.frameworks.size();
object.values["active_schedulers"] = master.getActiveFrameworks().size();
- object.values["activated_slaves"] = master.slaveHostnamePorts.size();
+ object.values["activated_slaves"] = master.slavePIDs.size();
+ object.values["deactivated_slaves"] = master.deactivatedSlavePIDs.size();
object.values["connected_slaves"] = master.slaves.size();
object.values["staged_tasks"] = master.stats.tasks[TASK_STAGING];
object.values["started_tasks"] = master.stats.tasks[TASK_STARTING];
@@ -317,7 +318,8 @@ Future<Response> state(
object.values["start_time"] = master.startTime;
object.values["id"] = master.info.id();
object.values["pid"] = string(master.self());
- object.values["activated_slaves"] = master.slaveHostnamePorts.size();
+ object.values["activated_slaves"] = master.slavePIDs.size();
+ object.values["deactivated_slaves"] = master.deactivatedSlavePIDs.size();
object.values["connected_slaves"] = master.slaves.size();
object.values["staged_tasks"] = master.stats.tasks[TASK_STAGING];
object.values["started_tasks"] = master.stats.tasks[TASK_STARTING];
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Tue Apr 30 01:31:27 2013
@@ -173,10 +173,7 @@ protected:
void deactivate()
{
dispatch(
- master,
- &Master::deactivatedSlaveHostnamePort,
- slaveInfo.hostname(),
- slave.port);
+ master, &Master::deactivateSlave, slaveInfo.hostname(), slave.port);
}
private:
@@ -888,7 +885,6 @@ void Master::registerSlave(const SlaveIn
// TODO(benh): We assume all slaves can register for now.
CHECK(flags.slaves == "*");
- activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
addSlave(slave);
// // Checks if this slave, or if all slaves, can be accepted.
@@ -917,7 +913,15 @@ void Master::reregisterSlave(const Slave
}
if (slaveId == "") {
- LOG(ERROR) << "Slave re-registered without an id!";
+ LOG(ERROR) << "Slave " << from << " re-registered without an id!";
+ reply(ShutdownMessage());
+ } else if (deactivatedSlavePIDs.contains(slaveInfo.hostname(), from.port)) {
+ // We disallow deactivated slaves from re-registering. This is
+ // to ensure that when a master deactivates a slave that was
+ // partitioned, we don't allow the slave to re-register, as we've
+ // already informed frameworks that the tasks were lost.
+ LOG(ERROR) << "Slave " << from
+ << " attempted to re-register after deactivation";
reply(ShutdownMessage());
} else {
Slave* slave = getSlave(slaveId);
@@ -995,7 +999,6 @@ void Master::reregisterSlave(const Slave
// TODO(benh): We assume all slaves can register for now.
CHECK(flags.slaves == "*");
- activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
readdSlave(slave, executorInfos, tasks);
// // Checks if this slave, or if all slaves, can be accepted.
@@ -1183,18 +1186,9 @@ void Master::exitedExecutor(const SlaveI
}
-void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
-{
- LOG(INFO) << "Master now considering a slave at "
- << hostname << ":" << port << " as active";
- slaveHostnamePorts.put(hostname, port);
-}
-
-
-void Master::deactivatedSlaveHostnamePort(const string& hostname,
- uint16_t port)
+void Master::deactivateSlave(const string& hostname, uint16_t port)
{
- if (slaveHostnamePorts.contains(hostname, port)) {
+ if (slavePIDs.contains(hostname, port)) {
// Look for a connected slave and remove it.
foreachvalue (Slave* slave, slaves) {
if (slave->info.hostname() == hostname && slave->pid.port == port) {
@@ -1209,7 +1203,6 @@ void Master::deactivatedSlaveHostnamePor
LOG(INFO) << "Master now considering a slave at "
<< hostname << ":" << port << " as inactive";
- slaveHostnamePorts.remove(hostname, port);
}
}
@@ -1855,6 +1848,8 @@ void Master::addSlave(Slave* slave, bool
<< " at " << slave->info.hostname()
<< " with " << slave->info.resources();
+ slavePIDs.put(slave->info.hostname(), slave->pid.port);
+ deactivatedSlavePIDs.remove(slave->info.hostname(), slave->pid.port);
slaves[slave->id] = slave;
link(slave->pid);
@@ -2030,6 +2025,8 @@ void Master::removeSlave(Slave* slave)
// TODO(benh): unlink(slave->pid);
// Delete it.
+ slavePIDs.remove(slave->info.hostname(), slave->pid.port);
+ deactivatedSlavePIDs.put(slave->info.hostname(), slave->pid.port);
slaves.erase(slave->id);
delete slave;
}
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Tue Apr 30 01:31:27 2013
@@ -115,8 +115,7 @@ public:
const FrameworkID& frameworkId,
const ExecutorID& executorId,
int32_t status);
- void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
- void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
+ void deactivateSlave(const std::string& hostname, uint16_t port);
void frameworkFailoverTimeout(const FrameworkID& frameworkId,
double reregisteredTime);
@@ -225,10 +224,11 @@ private:
MasterInfo info;
- multihashmap<std::string, uint16_t> slaveHostnamePorts;
-
hashmap<FrameworkID, Framework*> frameworks;
hashmap<SlaveID, Slave*> slaves;
+ multihashmap<std::string, uint16_t> slavePIDs;
+ multihashmap<std::string, uint16_t> deactivatedSlavePIDs;
+
hashmap<OfferID, Offer*> offers;
boost::circular_buffer<std::tr1::shared_ptr<Framework> > completedFrameworks;
Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Tue Apr 30 01:31:27 2013
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <unistd.h>
+
#include <gmock/gmock.h>
#include <map>
@@ -27,6 +29,8 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
#include "common/protobuf_utils.hpp"
@@ -136,7 +140,7 @@ TEST_F(FaultToleranceTest, SlaveLost)
// This test checks that a scheduler gets a slave lost
// message for a partioned slave.
-TEST_F(FaultToleranceTest, SlavePartitioned)
+TEST_F(FaultToleranceTest, PartitionedSlave)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
@@ -176,7 +180,7 @@ TEST_F(FaultToleranceTest, SlavePartitio
// Now advance through the PINGs.
uint32_t pings = 0;
- while(true) {
+ while (true) {
AWAIT_READY(ping);
pings++;
if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
@@ -199,6 +203,160 @@ TEST_F(FaultToleranceTest, SlavePartitio
}
+// TODO(bmahler): Remove this when all the tests are refactored.
+class FaultToleranceClusterTest : public MesosClusterTest {};
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to re-register, we deny the
+// re-registration by sending a ShutdownMessage to the slave.
+// Why? Because during a network partition, the master will remove a
+// partitioned slave, thus sending its tasks to LOST. At this point,
+// when the partition is removed, the slave will attempt to
+// re-register with its running tasks. We've already notified
+// frameworks that these tasks were LOST, so we have to have the slave
+// slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveReregistration)
+{
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ MockExecutor exec;
+ Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+ ASSERT_SOME(slave);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the SlaveObserver Process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ BasicMasterDetector detector(master.get(), slave.get(), true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ // Launch a task. This is to ensure the task is killed by the slave,
+ // during shutdown.
+ TaskID taskId;
+ taskId.set_value("1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->MergeFrom(taskId);
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+ task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Set up the expectations for launching the task.
+ EXPECT_CALL(exec, registered(_, _, _, _));
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> runningStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&runningStatus));
+
+ Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(runningStatus);
+ EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+
+ // Wait for the slave to have handled the acknowledgment prior
+ // to pausing the clock.
+ AWAIT_READY(statusUpdateAck);
+
+ // Drop the first shutdown message from the master (simulated
+ // partition), allow the second shutdown message to pass when
+ // the slave re-registers,
+ Future<ShutdownMessage> shutdownSlave =
+ DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ Future<TaskStatus> lostStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&lostStatus));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ Clock::pause();
+
+ // Now, induce a partition of the slave by having the master
+ // timeout the slave.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // The master will have notified the framework of the lost task.
+ AWAIT_READY(lostStatus);
+ EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+ // Wait for the master to attempt to shut down the slave.
+ AWAIT_READY(shutdownSlave);
+
+ // The master will notify the framework that the slave was lost.
+ AWAIT_READY(slaveLost);
+
+ // We now complete the partition on the slave side as well. This
+ // is done by simulating a NoMasterDetectedMessage which would
+ // normally occur during a network partition.
+ process::post(slave.get(), NoMasterDetectedMessage());
+
+ Future<Nothing> shutdownExecutor;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdownExecutor));
+
+ shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ // Have the slave re-register with the master.
+ NewMasterDetectedMessage newMasterDetectedMessage;
+ newMasterDetectedMessage.set_pid(master.get());
+ process::post(slave.get(), newMasterDetectedMessage);
+
+ // Upon re-registration, the master will shutdown the slave.
+ // The slave will then shut down the executor.
+ AWAIT_READY(shutdownSlave);
+ AWAIT_READY(shutdownExecutor);
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ cluster.shutdown();
+}
+
+
TEST_F(FaultToleranceTest, SchedulerFailover)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Modified: incubator/mesos/trunk/src/tests/master_detector_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_detector_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_detector_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_detector_tests.cpp Tue Apr 30 01:31:27 2013
@@ -26,7 +26,11 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <process/pid.hpp>
+
+#include <stout/gtest.hpp>
#include <stout/os.hpp>
+#include <stout/path.hpp>
#include <stout/try.hpp>
#include "detector/detector.hpp"
@@ -47,8 +51,10 @@ using mesos::internal::slave::Slave;
using process::Future;
using process::PID;
+using process::UPID;
using std::map;
+using std::string;
using std::vector;
using testing::_;
@@ -69,15 +75,17 @@ TEST_F(MasterDetectorTest, File)
PID<Slave> slave = process::spawn(&s);
// Write "master" to a file and use the "file://" mechanism to
- // create a master detector for the slave.
- Try<std::string> path = os::mktemp();
- ASSERT_SOME(path);
- ASSERT_SOME(os::write(path.get(), master.get()));
+ // create a master detector for the slave. Still requires a master
+ // detector for the master first.
+ BasicMasterDetector detector1(master.get(), vector<UPID>(), true);
+
+ const string& path = path::join(cluster.slaves.flags.work_dir, "master");
+ ASSERT_SOME(os::write(path, stringify(master.get())));
Try<MasterDetector*> detector =
- MasterDetector::create("file://" + path.get(), slave, false, true);
+ MasterDetector::create("file://" + path, slave, false, true);
- os::rm(path.get());
+ EXPECT_SOME(os::rm(path));
ASSERT_SOME(detector);
Modified: incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/zookeeper_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/zookeeper_tests.cpp Tue Apr 30 01:31:27 2013
@@ -400,11 +400,9 @@ TEST_F(ZooKeeperTest, MasterDetectorTime
EXPECT_CALL(follower, noMasterDetected())
.WillOnce(FutureSatisfy(&followerNoMasterDetected));
- // TODO(bmahler): This will be uncommented by the fix for:
- // https://issues.apache.org/jira/browse/MESOS-305
- // Future<Nothing> nonContenderNoMasterDetected;
- // EXPECT_CALL(nonContender, noMasterDetected())
- // .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
+ Future<Nothing> nonContenderNoMasterDetected;
+ EXPECT_CALL(nonContender, noMasterDetected())
+ .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
Clock::pause();
Clock::advance(ZOOKEEPER_SESSION_TIMEOUT);
@@ -412,10 +410,7 @@ TEST_F(ZooKeeperTest, MasterDetectorTime
AWAIT_READY(leaderNoMasterDetected);
AWAIT_READY(followerNoMasterDetected);
-
- // TODO(bmahler): This will be uncommented by the fix for:
- // https://issues.apache.org/jira/browse/MESOS-305
- // AWAIT_READY(nonContenderNoMasterDetected);
+ AWAIT_READY(nonContenderNoMasterDetected);
process::terminate(leader);
process::wait(leader);