You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/11/04 02:29:29 UTC

svn commit: r1405459 - /incubator/mesos/branches/0.10.0/src/detector/detector.cpp

Author: benh
Date: Sun Nov  4 01:29:29 2012
New Revision: 1405459

URL: http://svn.apache.org/viewvc?rev=1405459&view=rev
Log:
*** MODIFIED FOR 0.10.0 ***
Refactored underlying ZooKeeper master detector into a libprocess
process (https://reviews.apache.org/r/7290).

Modified:
    incubator/mesos/branches/0.10.0/src/detector/detector.cpp

Modified: incubator/mesos/branches/0.10.0/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.0/src/detector/detector.cpp?rev=1405459&r1=1405458&r2=1405459&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.0/src/detector/detector.cpp (original)
+++ incubator/mesos/branches/0.10.0/src/detector/detector.cpp Sun Nov  4 01:29:29 2012
@@ -26,6 +26,7 @@
 #include <process/protobuf.hpp>
 
 #include <stout/foreach.hpp>
+#include <stout/option.hpp>
 
 #include "detector/detector.hpp"
 
@@ -35,6 +36,7 @@
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/url.hpp"
+#include "zookeeper/watcher.hpp"
 #include "zookeeper/zookeeper.hpp"
 
 using namespace mesos;
@@ -49,8 +51,55 @@ using std::pair;
 using std::string;
 using std::vector;
 
+// TODO(benh): Make this value configurable via flags and verify that
+// it is always LESS THAN the slave heartbeat timeout.
+const seconds ZOOKEEPER_SESSION_TIMEOUT(10.0);
 
-class ZooKeeperMasterDetector : public MasterDetector, public Watcher
+
+class ZooKeeperMasterDetectorProcess
+  : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+  ZooKeeperMasterDetectorProcess(
+    const zookeeper::URL& url,
+    const UPID& pid,
+    bool contend,
+    bool quiet);
+
+  virtual ~ZooKeeperMasterDetectorProcess();
+
+  virtual void initialize();
+
+  // ZooKeeper events.
+  void connected(bool reconnect);
+  void reconnecting();
+  void expired();
+  void updated(const string& path);
+  void created(const string& path);
+  void deleted(const string& path);
+
+private:
+  // Attempts to detect a master.
+  void detectMaster();
+
+  const zookeeper::URL url;
+  const ACL_vector acl;
+
+  const UPID pid;
+  bool contend;
+
+  Watcher* watcher;
+  ZooKeeper* zk;
+
+  // Our sequence string if contending to be a master.
+  Option<string> mySeq;
+
+  string currentMasterSeq;
+  UPID currentMasterPID;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
 {
 public:
   /**
@@ -72,38 +121,8 @@ public:
 
   virtual ~ZooKeeperMasterDetector();
 
-  /**
-   * ZooKeeper watcher callback.
-   */
-  virtual void process(ZooKeeper *zk, int type, int state, const string &path);
-
 private:
-  void connected();
-  void reconnecting();
-  void reconnected();
-  void expired();
-  void updated(const string& path);
-
-  /**
-   * Attempts to detect a master.
-   */
-  void detectMaster();
-
-  const zookeeper::URL url;
-  const UPID pid;
-  bool contend;
-
-  bool reconnect;
-
-  ACL_vector acl;
-
-  ZooKeeper* zk;
-
-  // Our sequence string if contending to be a master.
-  string mySeq;
-
-  string currentMasterSeq;
-  UPID currentMasterPID;
+  ZooKeeperMasterDetectorProcess* process;
 };
 
 
@@ -254,144 +273,150 @@ BasicMasterDetector::BasicMasterDetector
 BasicMasterDetector::~BasicMasterDetector() {}
 
 
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
     const zookeeper::URL& _url,
     const UPID& _pid,
     bool _contend,
     bool quiet)
   : url(_url),
+    acl(url.authentication.isSome()
+        ? zookeeper::EVERYONE_READ_CREATOR_ALL
+        : ZOO_OPEN_ACL_UNSAFE),
     pid(_pid),
     contend(_contend),
-    reconnect(false)
+    watcher(NULL),
+    zk(NULL)
 {
   // Set verbosity level for underlying ZooKeeper library logging.
   // TODO(benh): Put this in the C++ API.
   zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+}
 
-  acl = url.authentication.isSome()
-    ? zookeeper::EVERYONE_READ_CREATOR_ALL
-    : ZOO_OPEN_ACL_UNSAFE;
 
-  // Start up the ZooKeeper connection!
-  zk = new ZooKeeper(url.servers, milliseconds(10000), this);
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+  delete zk;
+  delete watcher;
 }
 
-ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+
+void ZooKeeperMasterDetectorProcess::initialize()
 {
-  delete zk;
+  // Doing initialization here allows to avoid the race between
+  // instantiating the ZooKeeper instance and being spawned ourself.
+  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
+  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
 }
 
 
-void ZooKeeperMasterDetector::connected()
+void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
 {
-  LOG(INFO) << "Master detector connected to ZooKeeper ...";
+  if (!reconnect) {
+    LOG(INFO) << "Master detector connected to ZooKeeper ...";
 
-  int code;
-  if (url.authentication.isSome()) {
-    const std::string& scheme = url.authentication.get().scheme;
-    const std::string& credentials = url.authentication.get().credentials;
-    LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
-    code = zk->authenticate(scheme, credentials);
-    if (code != ZOK) {
-      LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
-                 << zk->message(code);
+    int code;
+    if (url.authentication.isSome()) {
+      const std::string& scheme = url.authentication.get().scheme;
+      const std::string& credentials = url.authentication.get().credentials;
+      LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
+      code = zk->authenticate(scheme, credentials);
+      if (code != ZOK) {
+        LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
+                   << zk->message(code);
+      }
     }
-  }
 
-  string result;
+    string result;
 
-  static const string delimiter = "/";
+    static const string delimiter = "/";
 
-  // Assume the path (chroot) being used does not end with a "/".
-  CHECK(url.path.at(url.path.length() - 1) != '/');
+    // Assume the path (chroot) being used does not end with a "/".
+    CHECK(url.path.at(url.path.length() - 1) != '/');
 
-  // Create znodes as necessary.
-  size_t index = url.path.find(delimiter, 0);
+    // Create znodes as necessary.
+    size_t index = url.path.find(delimiter, 0);
 
-  while (index < string::npos) {
-    // Get out the prefix to create.
-    index = url.path.find(delimiter, index + 1);
-    string prefix = url.path.substr(0, index);
+    while (index < string::npos) {
+      // Get out the prefix to create.
+      index = url.path.find(delimiter, index + 1);
+      string prefix = url.path.substr(0, index);
 
-    LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
+      LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
 
-    // Create the node (even if it already exists).
-    code = zk->create(prefix, "", acl, 0, &result);
+      // Create the node (even if it already exists).
+      code = zk->create(prefix, "", acl, 0, &result);
 
-    if (code != ZOK && code != ZNODEEXISTS) {
-      LOG(FATAL) << "Failed to create ZooKeeper znode: " << zk->message(code);
+      if (code != ZOK && code != ZNODEEXISTS) {
+        LOG(FATAL) << "Failed to create ZooKeeper znode: " << zk->message(code);
+      }
     }
-  }
 
-  // Wierdness in ZooKeeper timing, let's check that everything is created.
-  code = zk->get(url.path, false, &result, NULL);
-
-  if (code != ZOK) {
-    LOG(FATAL) << "Unexpected ZooKeeper failure: " << zk->message(code);
-  }
-
-  if (contend) {
-    // We contend with the pid given in constructor.
-    code = zk->create(url.path + "/", pid, acl,
-                     ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+    // Wierdness in ZooKeeper timing, let's check that everything is created.
+    code = zk->get(url.path, false, &result, NULL);
 
     if (code != ZOK) {
-      LOG(FATAL) << "Unexpected ZooKeeper failure: %s" << zk->message(code);
+      LOG(FATAL) << "Unexpected ZooKeeper failure: " << zk->message(code);
     }
 
-    // Save the sequence id but only grab the basename, e.g.,
-    // "/path/to/znode/000000131" => "000000131".
-    size_t index;
-    if ((index = result.find_last_of('/')) != string::npos) {
-      mySeq = result.erase(0, index + 1);
-    } else {
-      mySeq = result;
+    if (contend) {
+      // We contend with the pid given in constructor.
+      code = zk->create(url.path + "/", pid, acl,
+                        ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+
+      if (code != ZOK) {
+        LOG(FATAL) << "Unexpected ZooKeeper failure: %s" << zk->message(code);
+      }
+
+      // Save the sequence id but only grab the basename, e.g.,
+      // "/path/to/znode/000000131" => "000000131".
+      size_t index;
+      if ((index = result.find_last_of('/')) != string::npos) {
+        mySeq = result.erase(0, index + 1);
+      } else {
+        mySeq = result;
+      }
+
+      LOG(INFO) << "Created ephemeral/sequence:" << mySeq.get();
+
+      GotMasterTokenMessage message;
+      message.set_token(mySeq.get());
+      process::post(pid, message);
     }
 
-    LOG(INFO) << "Created ephemeral/sequence:" << mySeq;
+    // Now determine who the master is (it may be us).
+    detectMaster();
+  } else {
+    LOG(INFO) << "Master detector reconnected ...";
 
-    GotMasterTokenMessage message;
-    message.set_token(mySeq);
-    process::post(pid, message);
+    // Either we were the master and we're still the master (because
+    // we haven't yet gotten a session expiration), or someone else
+    // was the master and they're still the master, or someone else
+    // was the master and someone else still is now the master. Either
+    // way, run the leader detector.
+    detectMaster();
   }
-
-  // Now determine who the master is (it may be us).
-  detectMaster();
 }
 
 
-void ZooKeeperMasterDetector::reconnecting()
+void ZooKeeperMasterDetectorProcess::reconnecting()
 {
   LOG(INFO) << "Master detector lost connection to ZooKeeper, "
 	    << "attempting to reconnect ...";
 }
 
 
-void ZooKeeperMasterDetector::reconnected()
-{
-  LOG(INFO) << "Master detector reconnected ...";
-
-  // Either we were the master and we're still the master (because we
-  // haven't yet gotten a session expiration), or someone else was the
-  // master and they're still the master, or someone else was the
-  // master and someone else still is now the master. Either way, run
-  // the leader detector.
-  detectMaster();
-}
-
-
-void ZooKeeperMasterDetector::expired()
+void ZooKeeperMasterDetectorProcess::expired()
 {
   LOG(WARNING) << "Master detector ZooKeeper session expired!";
 
   CHECK(zk != NULL);
   delete zk;
 
-  zk = new ZooKeeper(url.servers, milliseconds(10000), this);
+  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
 }
 
 
-void ZooKeeperMasterDetector::updated(const string& path)
+void ZooKeeperMasterDetectorProcess::updated(const string& path)
 {
   // A new master might have showed up and created a sequence
   // identifier or a master may have died, determine who the master is now!
@@ -399,42 +424,19 @@ void ZooKeeperMasterDetector::updated(co
 }
 
 
-void ZooKeeperMasterDetector::process(ZooKeeper* zk, int type, int state,
-				      const string& path)
+void ZooKeeperMasterDetectorProcess::created(const string& path)
 {
-  if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
-    // Check if this is a reconnect.
-    if (!reconnect) {
-      // Initial connect.
-      connected();
-    } else {
-      // Reconnected.
-      reconnected();
-    }
-  } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
-    // The client library automatically reconnects, taking into
-    // account failed servers in the connection string,
-    // appropriately handling the "herd effect", etc.
-    reconnect = true;
-    reconnecting();
-  } else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
-    // Session expiration. Let the manager take care of it.
-    expired();
-
-    // If this watcher is reused, the next connect won't be a reconnect.
-    reconnect = false;
-  } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
-    updated(path);
-  } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
-    updated(path);
-  } else {
-    LOG(WARNING) << "Unimplemented watch event: (state is "
-		 << state << " and type is " << type << ")";
-  }
+  LOG(FATAL) << "Unexpected ZooKeeper event (created) for '" << path << "'";
+}
+
+
+void ZooKeeperMasterDetectorProcess::deleted(const string& path)
+{
+  LOG(FATAL) << "Unexpected ZooKeeper event (deleted) for '" << path << "'";
 }
 
 
-void ZooKeeperMasterDetector::detectMaster()
+void ZooKeeperMasterDetectorProcess::detectMaster()
 {
   vector<string> results;
 
@@ -495,3 +497,22 @@ void ZooKeeperMasterDetector::detectMast
     }
   }
 }
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(
+    const zookeeper::URL& url,
+    const UPID& pid,
+    bool contend,
+    bool quiet)
+{
+  process = new ZooKeeperMasterDetectorProcess(url, pid, contend, quiet);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}