You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/30 03:31:27 UTC

svn commit: r1477442 - in /incubator/mesos/trunk/src: detector/detector.cpp master/http.cpp master/master.cpp master/master.hpp tests/fault_tolerance_tests.cpp tests/master_detector_tests.cpp tests/zookeeper_tests.cpp

Author: bmahler
Date: Tue Apr 30 01:31:27 2013
New Revision: 1477442

URL: http://svn.apache.org/r1477442
Log:
Send NoMasterDetectedMessage on session timeout to non-contending
detectors. Added a disconnected slave map to the master to track
disconnected slaves, in order to disallow slave re-registration after
a network partition.

Review: https://reviews.apache.org/r/10172

Modified:
    incubator/mesos/trunk/src/detector/detector.cpp
    incubator/mesos/trunk/src/master/http.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
    incubator/mesos/trunk/src/tests/master_detector_tests.cpp
    incubator/mesos/trunk/src/tests/zookeeper_tests.cpp

Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Tue Apr 30 01:31:27 2013
@@ -388,22 +388,11 @@ void ZooKeeperMasterDetectorProcess::tim
     timer = None();
     expire = true;
 
-    // We only send a NoMasterDetectedMessage if we are a
-    // contending detector. This is because:
-    //    If we are a non-contending detector (e.g. slave), a zk session
-    //    expiration doesn't necessarily mean a new leader (master) is elected
-    //    (e.g. the slave is partitioned from the zk server). If the leading
-    //    master stays the same (i.e., no leader election), then the
-    //    slave should still accept a ShutDownMessage from the master.
-    //    If a new master does get elected, the slave would know about it
-    //    because it would do a leader detection after it connects/re-connects.
-    if (contend) {
-      // TODO(bmahler): We always want to clear the sequence number
-      // prior to sending NoMasterDetectedMessage. It might be prudent
-      // to use a helper function to enforce this.
-      currentMasterSeq = "";  // Clear the master sequence number.
-      process::post(pid, NoMasterDetectedMessage());
-    }
+    // TODO(bmahler): We always want to clear the sequence number
+    // prior to sending NoMasterDetectedMessage. It might be prudent
+    // to use a helper function to enforce this.
+    currentMasterSeq = "";  // Clear the master sequence number.
+    process::post(pid, NoMasterDetectedMessage());
   }
 }
 

Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Tue Apr 30 01:31:27 2013
@@ -265,7 +265,8 @@ Future<Response> stats(
   object.values["elected"] = master.elected; // Note: using int not bool.
   object.values["total_schedulers"] = master.frameworks.size();
   object.values["active_schedulers"] = master.getActiveFrameworks().size();
-  object.values["activated_slaves"] = master.slaveHostnamePorts.size();
+  object.values["activated_slaves"] = master.slavePIDs.size();
+  object.values["deactivated_slaves"] = master.deactivatedSlavePIDs.size();
   object.values["connected_slaves"] = master.slaves.size();
   object.values["staged_tasks"] = master.stats.tasks[TASK_STAGING];
   object.values["started_tasks"] = master.stats.tasks[TASK_STARTING];
@@ -317,7 +318,8 @@ Future<Response> state(
   object.values["start_time"] = master.startTime;
   object.values["id"] = master.info.id();
   object.values["pid"] = string(master.self());
-  object.values["activated_slaves"] = master.slaveHostnamePorts.size();
+  object.values["activated_slaves"] = master.slavePIDs.size();
+  object.values["deactivated_slaves"] = master.deactivatedSlavePIDs.size();
   object.values["connected_slaves"] = master.slaves.size();
   object.values["staged_tasks"] = master.stats.tasks[TASK_STAGING];
   object.values["started_tasks"] = master.stats.tasks[TASK_STARTING];

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Tue Apr 30 01:31:27 2013
@@ -173,10 +173,7 @@ protected:
   void deactivate()
   {
     dispatch(
-        master,
-        &Master::deactivatedSlaveHostnamePort,
-        slaveInfo.hostname(),
-        slave.port);
+        master, &Master::deactivateSlave, slaveInfo.hostname(), slave.port);
   }
 
 private:
@@ -888,7 +885,6 @@ void Master::registerSlave(const SlaveIn
 
   // TODO(benh): We assume all slaves can register for now.
   CHECK(flags.slaves == "*");
-  activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
   addSlave(slave);
 
 //   // Checks if this slave, or if all slaves, can be accepted.
@@ -917,7 +913,15 @@ void Master::reregisterSlave(const Slave
   }
 
   if (slaveId == "") {
-    LOG(ERROR) << "Slave re-registered without an id!";
+    LOG(ERROR) << "Slave " << from << " re-registered without an id!";
+    reply(ShutdownMessage());
+  } else if (deactivatedSlavePIDs.contains(slaveInfo.hostname(), from.port)) {
+    // We disallow deactivated slaves from re-registering. This is
+    // to ensure that when a master deactivates a slave that was
+    // partitioned, we don't allow the slave to re-register, as we've
+    // already informed frameworks that the tasks were lost.
+    LOG(ERROR) << "Slave " << from
+               << " attempted to re-register after deactivation";
     reply(ShutdownMessage());
   } else {
     Slave* slave = getSlave(slaveId);
@@ -995,7 +999,6 @@ void Master::reregisterSlave(const Slave
 
       // TODO(benh): We assume all slaves can register for now.
       CHECK(flags.slaves == "*");
-      activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
       readdSlave(slave, executorInfos, tasks);
 
 //       // Checks if this slave, or if all slaves, can be accepted.
@@ -1183,18 +1186,9 @@ void Master::exitedExecutor(const SlaveI
 }
 
 
-void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
-{
-  LOG(INFO) << "Master now considering a slave at "
-            << hostname << ":" << port << " as active";
-  slaveHostnamePorts.put(hostname, port);
-}
-
-
-void Master::deactivatedSlaveHostnamePort(const string& hostname,
-                                          uint16_t port)
+void Master::deactivateSlave(const string& hostname, uint16_t port)
 {
-  if (slaveHostnamePorts.contains(hostname, port)) {
+  if (slavePIDs.contains(hostname, port)) {
     // Look for a connected slave and remove it.
     foreachvalue (Slave* slave, slaves) {
       if (slave->info.hostname() == hostname && slave->pid.port == port) {
@@ -1209,7 +1203,6 @@ void Master::deactivatedSlaveHostnamePor
 
     LOG(INFO) << "Master now considering a slave at "
 	            << hostname << ":" << port << " as inactive";
-    slaveHostnamePorts.remove(hostname, port);
   }
 }
 
@@ -1855,6 +1848,8 @@ void Master::addSlave(Slave* slave, bool
             << " at " << slave->info.hostname()
             << " with " << slave->info.resources();
 
+  slavePIDs.put(slave->info.hostname(), slave->pid.port);
+  deactivatedSlavePIDs.remove(slave->info.hostname(), slave->pid.port);
   slaves[slave->id] = slave;
 
   link(slave->pid);
@@ -2030,6 +2025,8 @@ void Master::removeSlave(Slave* slave)
   // TODO(benh): unlink(slave->pid);
 
   // Delete it.
+  slavePIDs.remove(slave->info.hostname(), slave->pid.port);
+  deactivatedSlavePIDs.put(slave->info.hostname(), slave->pid.port);
   slaves.erase(slave->id);
   delete slave;
 }

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Tue Apr 30 01:31:27 2013
@@ -115,8 +115,7 @@ public:
                       const FrameworkID& frameworkId,
                       const ExecutorID& executorId,
                       int32_t status);
-  void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
-  void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
+  void deactivateSlave(const std::string& hostname, uint16_t port);
   void frameworkFailoverTimeout(const FrameworkID& frameworkId,
                                 double reregisteredTime);
 
@@ -225,10 +224,11 @@ private:
 
   MasterInfo info;
 
-  multihashmap<std::string, uint16_t> slaveHostnamePorts;
-
   hashmap<FrameworkID, Framework*> frameworks;
   hashmap<SlaveID, Slave*> slaves;
+  multihashmap<std::string, uint16_t> slavePIDs;
+  multihashmap<std::string, uint16_t> deactivatedSlavePIDs;
+
   hashmap<OfferID, Offer*> offers;
 
   boost::circular_buffer<std::tr1::shared_ptr<Framework> > completedFrameworks;

Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Tue Apr 30 01:31:27 2013
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include <unistd.h>
+
 #include <gmock/gmock.h>
 
 #include <map>
@@ -27,6 +29,8 @@
 
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
 
 #include "common/protobuf_utils.hpp"
 
@@ -136,7 +140,7 @@ TEST_F(FaultToleranceTest, SlaveLost)
 
 // This test checks that a scheduler gets a slave lost
 // message for a partioned slave.
-TEST_F(FaultToleranceTest, SlavePartitioned)
+TEST_F(FaultToleranceTest, PartitionedSlave)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
@@ -176,7 +180,7 @@ TEST_F(FaultToleranceTest, SlavePartitio
 
   // Now advance through the PINGs.
   uint32_t pings = 0;
-  while(true) {
+  while (true) {
     AWAIT_READY(ping);
     pings++;
     if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
@@ -199,6 +203,160 @@ TEST_F(FaultToleranceTest, SlavePartitio
 }
 
 
+// TODO(bmahler): Remove this when all the tests are refactored.
+class FaultToleranceClusterTest : public MesosClusterTest {};
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to re-register, we deny the
+// re-registration by sending a ShutdownMessage to the slave.
+// Why? Because during a network partition, the master will remove a
+// partitioned slave, thus sending its tasks to LOST. At this point,
+// when the partition is removed, the slave will attempt to
+// re-register with its running tasks. We've already notified
+// frameworks that these tasks were LOST, so we have to have the slave
+// slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveReregistration)
+{
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  MockExecutor exec;
+  Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+  ASSERT_SOME(slave);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  BasicMasterDetector detector(master.get(), slave.get(), true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  // Launch a task. This is to ensure the task is killed by the slave,
+  // during shutdown.
+  TaskID taskId;
+  taskId.set_value("1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->MergeFrom(taskId);
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Set up the expectations for launching the task.
+  EXPECT_CALL(exec, registered(_, _, _, _));
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+      slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+
+  // Wait for the slave to have handled the acknowledgment prior
+  // to pausing the clock.
+  AWAIT_READY(statusUpdateAck);
+
+  // Drop the first shutdown message from the master (simulated
+  // partition), allow the second shutdown message to pass when
+  // the slave re-registers,
+  Future<ShutdownMessage> shutdownSlave =
+    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // The master will have notified the framework of the lost task.
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+  // Wait for the master to attempt to shut down the slave.
+  AWAIT_READY(shutdownSlave);
+
+  // The master will notify the framework that the slave was lost.
+  AWAIT_READY(slaveLost);
+
+  // We now complete the partition on the slave side as well. This
+  // is done by simulating a NoMasterDetectedMessage which would
+  // normally occur during a network partition.
+  process::post(slave.get(), NoMasterDetectedMessage());
+
+  Future<Nothing> shutdownExecutor;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdownExecutor));
+
+  shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  // Have the slave re-register with the master.
+  NewMasterDetectedMessage newMasterDetectedMessage;
+  newMasterDetectedMessage.set_pid(master.get());
+  process::post(slave.get(), newMasterDetectedMessage);
+
+  // Upon re-registration, the master will shutdown the slave.
+  // The slave will then shut down the executor.
+  AWAIT_READY(shutdownSlave);
+  AWAIT_READY(shutdownExecutor);
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  cluster.shutdown();
+}
+
+
 TEST_F(FaultToleranceTest, SchedulerFailover)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);

Modified: incubator/mesos/trunk/src/tests/master_detector_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_detector_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_detector_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_detector_tests.cpp Tue Apr 30 01:31:27 2013
@@ -26,7 +26,11 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <process/pid.hpp>
+
+#include <stout/gtest.hpp>
 #include <stout/os.hpp>
+#include <stout/path.hpp>
 #include <stout/try.hpp>
 
 #include "detector/detector.hpp"
@@ -47,8 +51,10 @@ using mesos::internal::slave::Slave;
 
 using process::Future;
 using process::PID;
+using process::UPID;
 
 using std::map;
+using std::string;
 using std::vector;
 
 using testing::_;
@@ -69,15 +75,17 @@ TEST_F(MasterDetectorTest, File)
   PID<Slave> slave = process::spawn(&s);
 
   // Write "master" to a file and use the "file://" mechanism to
-  // create a master detector for the slave.
-  Try<std::string> path = os::mktemp();
-  ASSERT_SOME(path);
-  ASSERT_SOME(os::write(path.get(), master.get()));
+  // create a master detector for the slave. Still requires a master
+  // detector for the master first.
+  BasicMasterDetector detector1(master.get(), vector<UPID>(), true);
+
+  const string& path = path::join(cluster.slaves.flags.work_dir, "master");
+  ASSERT_SOME(os::write(path, stringify(master.get())));
 
   Try<MasterDetector*> detector =
-    MasterDetector::create("file://" + path.get(), slave, false, true);
+    MasterDetector::create("file://" + path, slave, false, true);
 
-  os::rm(path.get());
+  EXPECT_SOME(os::rm(path));
 
   ASSERT_SOME(detector);
 

Modified: incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/zookeeper_tests.cpp?rev=1477442&r1=1477441&r2=1477442&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/zookeeper_tests.cpp Tue Apr 30 01:31:27 2013
@@ -400,11 +400,9 @@ TEST_F(ZooKeeperTest, MasterDetectorTime
   EXPECT_CALL(follower, noMasterDetected())
     .WillOnce(FutureSatisfy(&followerNoMasterDetected));
 
-  // TODO(bmahler): This will be uncommented by the fix for:
-  // https://issues.apache.org/jira/browse/MESOS-305
-  //  Future<Nothing> nonContenderNoMasterDetected;
-  //  EXPECT_CALL(nonContender, noMasterDetected())
-  //    .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
+  Future<Nothing> nonContenderNoMasterDetected;
+  EXPECT_CALL(nonContender, noMasterDetected())
+    .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
 
   Clock::pause();
   Clock::advance(ZOOKEEPER_SESSION_TIMEOUT);
@@ -412,10 +410,7 @@ TEST_F(ZooKeeperTest, MasterDetectorTime
 
   AWAIT_READY(leaderNoMasterDetected);
   AWAIT_READY(followerNoMasterDetected);
-
-  // TODO(bmahler): This will be uncommented by the fix for:
-  // https://issues.apache.org/jira/browse/MESOS-305
-  // AWAIT_READY(nonContenderNoMasterDetected);
+  AWAIT_READY(nonContenderNoMasterDetected);
 
   process::terminate(leader);
   process::wait(leader);