You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/12/11 01:03:50 UTC

svn commit: r1419937 - in /incubator/mesos/branches/0.10.x/src: detector/detector.cpp detector/detector.hpp slave/slave.cpp tests/zookeeper_server.hpp tests/zookeeper_tests.cpp

Author: benh
Date: Tue Dec 11 00:03:49 2012
New Revision: 1419937

URL: http://svn.apache.org/viewvc?rev=1419937&view=rev
Log:
*** MODIFIED FOR 0.10.0 ***
Fixed the master detector to only send a NoMasterDetected message to a
leading master when the detector gets partitioned and we decide to
manually expire the session.

From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7746

Modified:
    incubator/mesos/branches/0.10.x/src/detector/detector.cpp
    incubator/mesos/branches/0.10.x/src/detector/detector.hpp
    incubator/mesos/branches/0.10.x/src/slave/slave.cpp
    incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp
    incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp

Modified: incubator/mesos/branches/0.10.x/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/detector/detector.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/detector/detector.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/detector/detector.cpp Tue Dec 11 00:03:49 2012
@@ -23,6 +23,8 @@
 #include <vector>
 
 #include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/timer.hpp>
@@ -43,9 +45,7 @@
 #include "zookeeper/watcher.hpp"
 #include "zookeeper/zookeeper.hpp"
 
-using namespace mesos;
-using namespace mesos::internal;
-
+using process::Future;
 using process::Process;
 using process::Timer;
 using process::UPID;
@@ -55,6 +55,9 @@ using std::pair;
 using std::string;
 using std::vector;
 
+namespace mesos {
+namespace internal {
+
 // TODO(benh): Make this value configurable via flags and verify that
 // it is always LESS THAN the slave heartbeat timeout.
 const seconds ZOOKEEPER_SESSION_TIMEOUT(10.0);
@@ -74,6 +77,9 @@ public:
 
   virtual void initialize();
 
+  // ZooKeeperMasterDetector implementation.
+  int64_t session();
+
   // ZooKeeper events.
   void connected(bool reconnect);
   void reconnecting();
@@ -108,33 +114,6 @@ private:
 };
 
 
-class ZooKeeperMasterDetector : public MasterDetector
-{
-public:
-  /**
-   * Uses ZooKeeper for both detecting masters and contending to be a
-   * master.
-   *
-   * @param server comma separated list of server host:port pairs
-   * @param znode top-level "ZooKeeper node" (directory) to use
-   * @param pid libprocess pid to send messages/updates to (and to
-   * use for contending to be a master)
-   * @param contend true if should contend to be master and false otherwise (not
-   * needed for slaves and frameworks)
-   * @param quiet verbosity logging level for underlying ZooKeeper library
-   */
-  ZooKeeperMasterDetector(const zookeeper::URL& url,
-                          const UPID& pid,
-                          bool contend,
-                          bool quiet);
-
-  virtual ~ZooKeeperMasterDetector();
-
-private:
-  ZooKeeperMasterDetectorProcess* process;
-};
-
-
 MasterDetector::~MasterDetector() {}
 
 
@@ -292,6 +271,13 @@ void ZooKeeperMasterDetectorProcess::ini
 }
 
 
+int64_t ZooKeeperMasterDetectorProcess::session()
+{
+  CHECK_NOTNULL(zk);
+  return zk->getSessionId();
+}
+
+
 void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
 {
   if (!reconnect) {
@@ -387,7 +373,7 @@ void ZooKeeperMasterDetectorProcess::con
 void ZooKeeperMasterDetectorProcess::reconnecting()
 {
   LOG(INFO) << "Master detector lost connection to ZooKeeper, "
-	    << "attempting to reconnect ...";
+            << "attempting to reconnect ...";
 
   // ZooKeeper won't tell us of a session expiration until we
   // reconnect, which could occur much much later than the session was
@@ -454,7 +440,28 @@ void ZooKeeperMasterDetectorProcess::tim
                  << "(sessionId=" << std::hex << sessionId << ")";
     timer = Option<Timer>::none();
     expire = true;
-    process::post(pid, NoMasterDetectedMessage());
+
+    // We only send a NoMasterDetectedMessage if we are a
+    // contending detector AND ALSO the current leader.
+    // This is because:
+    // 1) If we are a non-contending detector (e.g. slave), a zk session
+    //    expiration doesn't necessarily mean a new leader (master) is elected
+    //    (e.g. the slave is partitioned from the zk server). If the leading
+    //    master stays the same (i.e., no leader election), then the
+    //    slave should still accept a ShutDownMessage from the master.
+    //    If a new master does get elected, the slave would know about it
+    //    because it would do a leader detection after it connects/re-connects.
+    // 2) If we are a contender but not the leader (e.g. non-leading master),
+    //    sending a NoMasterDetectedMessage() is bad because, a partitioned
+    //    non-leading master would never know about the leading master that
+    //    stays the same (i.e., no leader election) even after it is
+    //    connected/reconnected with the ZooKeeper. This is because, the
+    //    the master detection code (detectMaster()) will not send a
+    //    NewMasterDetectedMessage to the non-leading master as there is no
+    //    change in the currentMasterSeq.
+    if (contend && currentMasterPID == pid) {
+      process::post(pid, NoMasterDetectedMessage());
+    }
   }
 }
 
@@ -466,8 +473,18 @@ void ZooKeeperMasterDetectorProcess::det
   int code = zk->getChildren(url.path, true, &results);
 
   if (code != ZOK) {
-    LOG(ERROR) << "Master detector failed to get masters: "
-               << zk->message(code);
+    if (zk->retryable(code)) {
+      // NOTE: We don't expect a ZNONODE here because 'url.path' is always
+      // created in the connected() call. Despite that, we don't do a
+      // CHECK (code != ZNONODE) just to be safe incase the zk client library
+      // does return the code unexpectedly.
+      LOG(ERROR) << "Master detector failed to get masters: "
+                 << zk->message(code);
+      return; // Try again when we reconnect.
+    } else {
+      LOG(FATAL) << "Non-retryable ZooKeeper error while getting masters: "
+                 << zk->message(code);
+    }
   } else {
     LOG(INFO) << "Master detector found " << results.size()
               << " registered masters";
@@ -490,6 +507,7 @@ void ZooKeeperMasterDetectorProcess::det
 
   // No master present (lost or possibly hasn't come up yet).
   if (masterSeq.empty()) {
+    LOG(INFO) << "Master detector of " << pid << " couldn't find any masters";
     process::post(pid, NoMasterDetectedMessage());
   } else if (masterSeq != currentMasterSeq) {
     // Okay, let's fetch the master pid from ZooKeeper.
@@ -499,9 +517,19 @@ void ZooKeeperMasterDetectorProcess::det
     if (code != ZOK) {
       // This is possible because the master might have failed since
       // the invocation of ZooKeeper::getChildren above.
-      LOG(ERROR) << "Master detector failed to fetch new master pid: "
-		 << zk->message(code);
-      process::post(pid, NoMasterDetectedMessage());
+      // It is fine to not send a NoMasterDetectedMessage here because,
+      // 1) If this is due to a connection loss or session expiration,
+      //    connected() or expired() will be called and the leader detection
+      //    code (detectMaster()) will be re-tried.
+      // 2) If this is due to no masters present (i.e., code == ZNONODE),
+      //    updated() will be called and the detectMaster() will be re-tried.
+      if (zk->retryable(code)) {
+        LOG(ERROR) << "Master detector failed to fetch new master pid: "
+                   << zk->message(code);
+      } else {
+        LOG(FATAL) << "Non-retryable ZooKeeper error while fetching "
+                   << "new master pid: " << zk->message(code);
+      }
     } else {
       // Now let's parse what we fetched from ZooKeeper.
       LOG(INFO) << "Master detector got new master pid: " << result;
@@ -509,18 +537,18 @@ void ZooKeeperMasterDetectorProcess::det
       UPID masterPid = result;
 
       if (masterPid == UPID()) {
-	// TODO(benh): Maybe we should try again then!?!? Parsing
-	// might have failed because of DNS, and whoever is using the
-	// detector might sit "unconnected" indefinitely!
-	LOG(ERROR) << "Failed to parse new master pid!";
-	process::post(pid, NoMasterDetectedMessage());
+        // TODO(benh): Maybe we should try again then!?!? Parsing
+        // might have failed because of DNS, and whoever is using the
+        // detector might sit "unconnected" indefinitely!
+        LOG(ERROR) << "Failed to parse new master pid!";
+        process::post(pid, NoMasterDetectedMessage());
       } else {
-	currentMasterSeq = masterSeq;
-	currentMasterPID = masterPid;
+        currentMasterSeq = masterSeq;
+        currentMasterPID = masterPid;
 
         NewMasterDetectedMessage message;
-	message.set_pid(currentMasterPID);
-	process::post(pid, message);
+        message.set_pid(currentMasterPID);
+        process::post(pid, message);
       }
     }
   }
@@ -544,3 +572,12 @@ ZooKeeperMasterDetector::~ZooKeeperMaste
   wait(process);
   delete process;
 }
+
+
+Future<int64_t> ZooKeeperMasterDetector::session()
+{
+  return dispatch(process, &ZooKeeperMasterDetectorProcess::session);
+}
+
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/branches/0.10.x/src/detector/detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/detector/detector.hpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/detector/detector.hpp (original)
+++ incubator/mesos/branches/0.10.x/src/detector/detector.hpp Tue Dec 11 00:03:49 2012
@@ -25,13 +25,20 @@
 #include <climits>
 #include <cstdlib>
 
+#include <process/future.hpp>
+#include <process/pid.hpp>
 #include <process/process.hpp>
 
 #include <stout/try.hpp>
 
+#include "zookeeper/url.hpp"
+
 namespace mesos {
 namespace internal {
 
+class ZooKeeperMasterDetectorProcess; // Forward declaration.
+
+
 /**
  * Implements functionality for:
  *   a) detecting masters
@@ -104,8 +111,38 @@ private:
   const process::UPID master;
 };
 
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+  /**
+   * Uses ZooKeeper for both detecting masters and contending to be a
+   * master.
+   *
+   * @param url znode path of the master
+   * @param pid libprocess pid to send messages/updates to (and to
+   * use for contending to be a master)
+   * @param contend true if should contend to be master and false otherwise (not
+   * needed for slaves and frameworks)
+   * @param quiet verbosity logging level for underlying ZooKeeper library
+   */
+  ZooKeeperMasterDetector(const zookeeper::URL& url,
+                          const process::UPID& pid,
+                          bool contend,
+                          bool quiet);
+
+  virtual ~ZooKeeperMasterDetector();
+
+  /**
+   *  Returns the ZooKeeper session ID associated with this detector.
+   */
+  process::Future<int64_t> session();
+
+private:
+  ZooKeeperMasterDetectorProcess* process;
+};
+
 } // namespace internal {
 } // namespace mesos {
 
 #endif // __MASTER_DETECTOR_HPP__
-

Modified: incubator/mesos/branches/0.10.x/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/slave/slave.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/slave/slave.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/slave/slave.cpp Tue Dec 11 00:03:49 2012
@@ -336,7 +336,7 @@ void Slave::shutdown()
 {
   if (from != master) {
     LOG(WARNING) << "Ignoring shutdown message from " << from
-                 << "because it is not from the registered master ("
+                 << " because it is not from the registered master ("
                  << master << ")";
     return;
   }

Modified: incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp (original)
+++ incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp Tue Dec 11 00:03:49 2012
@@ -53,7 +53,16 @@ public:
   // the first call and re-uses the port on subsequent calls.
   int startNetwork();
 
-  // Forces the server to expire the given session immediately.
+  // Forces the server to expire the given session.
+  // Note that there is a delay (~3s) for the corresponding client to receive
+  // a session expiration event notification from the ZooKeeper server.
+  // TODO(vinod): Fix this so that the notification is immediate.
+  // One possible solution is suggested at :
+  // http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
+  // But according to,
+  // http://comments.gmane.org/gmane.comp.java.hadoop.zookeeper.user/4489
+  // the C binding for ZooKeeper does not yet support multiple
+  // clients with the same session id.
   void expireSession(int64_t sessionId);
 
 private:

Modified: incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp Tue Dec 11 00:03:49 2012
@@ -39,6 +39,7 @@
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
 
 using namespace mesos::internal;
 using namespace mesos::internal::test;
@@ -46,6 +47,7 @@ using namespace mesos::internal::test;
 using process::Clock;
 
 using testing::_;
+using testing::AtMost;
 using testing::Return;
 
 
@@ -275,7 +277,7 @@ TEST_F(ZooKeeperTest, MasterDetectorShut
 
   zks->startNetwork();
 
-  WAIT_FOR(newMasterDetectedCall2, seconds(5.0));
+  WAIT_FOR(newMasterDetectedCall2, seconds(5.0)); // ZooKeeper needs extra time.
 
   MasterDetector::destroy(detector.get());
 
@@ -286,6 +288,70 @@ TEST_F(ZooKeeperTest, MasterDetectorShut
 }
 
 
+TEST_F(ZooKeeperTest, MasterDetectorExpireZKSession)
+{
+  // Simulate a leading master.
+  MockMasterDetectorListenerProcess leader;
+
+  trigger newMasterDetectedCall1, newMasterDetectedCall2;
+  EXPECT_CALL(leader, newMasterDetected(_))
+    .WillOnce(Trigger(&newMasterDetectedCall1))
+    .WillOnce(Trigger(&newMasterDetectedCall2));
+
+  EXPECT_CALL(leader, noMasterDetected())
+    .Times(0);
+
+  process::spawn(leader);
+
+  std::string znode = "zk://" + zks->connectString() + "/mesos";
+
+  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
+  ASSERT_TRUE(url.isSome());
+
+  // Leader's detector.
+  ZooKeeperMasterDetector leaderDetector(url.get(), leader.self(), true, true);
+
+  WAIT_UNTIL(newMasterDetectedCall1);
+
+  // Simulate a following master.
+  MockMasterDetectorListenerProcess follower;
+
+  trigger newMasterDetectedCall3;
+  EXPECT_CALL(follower, newMasterDetected(_))
+    .WillOnce(Trigger(&newMasterDetectedCall3))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(follower, noMasterDetected())
+    .Times(0);
+
+  process::spawn(follower);
+
+  // Follower's detector.
+  ZooKeeperMasterDetector followerDetector(
+      url.get(), follower.self(), true, true);
+
+  WAIT_UNTIL(newMasterDetectedCall3);
+
+  // Now expire the leader's zk session.
+  process::Future<int64_t> session = leaderDetector.session();
+  session.await();
+  ASSERT_FALSE(session.isFailed()) << session.failure();
+  ASSERT_FALSE(session.isDiscarded());
+  ASSERT_TRUE(session.isReady());
+
+  zks->expireSession(session.get());
+
+  // Wait for session expiration and ensure we receive a newMasterDetected call.
+  WAIT_FOR(newMasterDetectedCall2, seconds(5.0)); // ZooKeeper needs extra time.
+
+  process::terminate(follower);
+  process::wait(follower);
+
+  process::terminate(leader);
+  process::wait(leader);
+}
+
+
 TEST_F(ZooKeeperTest, Group)
 {
   zookeeper::Group group(zks->connectString(), NO_TIMEOUT, "/test/");