You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:24:14 UTC

svn commit: r1131740 - in /incubator/mesos/trunk/src: master_detector.cpp master_detector.hpp zookeeper.cpp zookeeper.hpp

Author: benh
Date: Sun Jun  5 05:24:14 2011
New Revision: 1131740

URL: http://svn.apache.org/viewvc?rev=1131740&view=rev
Log:
New master_detector implementation that uses libprocess ZooKeeper API.

Added:
    incubator/mesos/trunk/src/master_detector.cpp
    incubator/mesos/trunk/src/master_detector.hpp
Modified:
    incubator/mesos/trunk/src/zookeeper.cpp
    incubator/mesos/trunk/src/zookeeper.hpp

Added: incubator/mesos/trunk/src/master_detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.cpp?rev=1131740&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master_detector.cpp (added)
+++ incubator/mesos/trunk/src/master_detector.cpp Sun Jun  5 05:24:14 2011
@@ -0,0 +1,166 @@
+#include <unistd.h>
+
+#include <process.hpp>
+
+#include <iostream>
+#include <climits>
+#include <cstdlib>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+#include <boost/lexical_cast.hpp>
+
+#include "fatal.hpp"
+#include "master_detector.hpp"
+#include "messages.hpp"
+
+using namespace nexus;
+using namespace nexus::internal;
+
+using boost::lexical_cast;
+
+
+MasterDetector::MasterDetector(const string &_servers, const string &_znode,
+			       const PID &_pid, bool _contend)
+  : servers(_servers), znode(_znode), pid(_pid), contend(_contend)
+{
+  zk = new ZooKeeper(servers, 10000, this);
+}
+
+
+MasterDetector::~MasterDetector()
+{
+  if (zk != NULL)
+    delete zk;
+}
+
+
+void MasterDetector::process(ZooKeeper *zk, int type, int state,
+			     const string &path)
+{
+  int ret;
+  string result;
+
+  static const string delimiter = "/";
+
+  if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+    // Create directory path znodes as necessary.
+    size_t index = znode.find(delimiter, 0);
+    while (index < string::npos) {
+      index = znode.find(delimiter, index+1);
+      string prefix = znode.substr(0, index);
+      ret = zk->create(prefix, "", ZOO_CREATOR_ALL_ACL,
+		       0, &result);
+      if (ret != ZOK && ret != ZNODEEXISTS)
+	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+    }
+
+    // Wierdness in ZooKeeper timing, let's check that everything is created.
+    ret = zk->get(znode, false, &result, NULL);
+
+    if (ret != ZOK)
+      fatal("ZooKeeper not responding correctly (%s). "
+	    "Make sure ZooKeeper is running on: %s",
+	    zk->error(ret), servers.c_str());
+
+    if (contend) {
+      // We use the contend with the pid given in constructor.
+      ret = zk->create(znode, pid, ZOO_CREATOR_ALL_ACL,
+		       ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+
+      if (ret != ZOK)
+	fatal("ZooKeeper not responding correctly (%s). "
+	      "Make sure ZooKeeper is running on: %s",
+	      zk->error(ret), servers.c_str());
+
+      setMySeq(result);
+      LOG(INFO) << "Created ephemeral/sequence:" << getMySeq();
+
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_SEQ>(getMySeq()));
+      Process::post(pid, GOT_MASTER_SEQ, s.data(), s.size());
+    }
+
+    detectMaster();
+  } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
+    detectMaster();
+  } else {
+    LOG(INFO) << "Unimplemented watch event: (state is "
+	      << state << " and type is " << type << ")";
+  }
+}
+
+
+void MasterDetector::detectMaster()
+{
+  vector<string> results;
+
+  int ret = zk->getChildren(znode, true, &results);
+
+  if (ret != ZOK)
+    LOG(ERROR) << "failed to get masters: " << zk->error(ret);
+  else
+    LOG(INFO) << "found " << results.size() << " registered masters";
+  
+  string masterSeq;
+  long min = LONG_MAX;
+  foreach (const string &result, results) {
+    int i = lexical_cast<int>(result);
+    if (i < min) {
+      min = i;
+      masterSeq = result;
+    }
+  }
+
+  // No master present (lost or possibly hasn't come up yet).
+  if (masterSeq.empty()) {
+    const string &s =
+      Tuple<Process>::tupleToString(Tuple<Process>::pack<NO_MASTER_DETECTED>());
+    Process::post(pid, NO_MASTER_DETECTED, s.data(), s.size());
+  } else if (masterSeq != currentMasterSeq) {
+    currentMasterSeq = masterSeq;
+    currentMasterPID = lookupMasterPID(masterSeq); 
+
+    // While trying to get the master PID, master might have crashed,
+    // so PID might be empty.
+    if (currentMasterPID == PID()) {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<NO_MASTER_DETECTED>());
+      Process::post(pid, NO_MASTER_DETECTED, s.data(), s.size());
+    } else {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>(currentMasterSeq, currentMasterPID));
+      Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+    }
+  }
+}
+
+
+PID MasterDetector::lookupMasterPID(const string &seq) const
+{
+  CHECK(!seq.empty());
+
+  int ret;
+  string result;
+
+  ret = zk->get(znode + seq, false, &result, NULL);
+
+  if (ret != ZOK)
+    LOG(ERROR) << "failed to fetch new master pid: " << zk->error(ret);
+  else
+    LOG(INFO) << "got new master pid: " << result;
+
+  // TODO(benh): Automatic cast!
+  return make_pid(result.c_str());
+}
+
+
+string MasterDetector::getCurrentMasterSeq() const {
+  return currentMasterSeq;
+}
+
+
+PID MasterDetector::getCurrentMasterPID() const {
+  return currentMasterPID;
+}

Added: incubator/mesos/trunk/src/master_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.hpp?rev=1131740&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master_detector.hpp (added)
+++ incubator/mesos/trunk/src/master_detector.hpp Sun Jun  5 05:24:14 2011
@@ -0,0 +1,105 @@
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <string>
+#include <iostream>
+#include <unistd.h>
+#include <climits>
+#include <cstdlib>
+
+#include "zookeeper.hpp"
+
+using namespace std;
+
+
+/**
+ * Implements functionality for a) detecting masters b) contending to be a master.
+ */
+class MasterDetector : public Watcher {
+public:
+  /** 
+   * Connects to ZooKeeper and possibly contends for master.
+   * 
+   * @param server comma separated list of zookeeper server host:port
+   *   pairs
+   * @param znode ZooKeeper node (directory)
+   * @param pid libprocess pid that we should send messages to (and to
+   *   contend)
+   * @param contend true if should contend to be master (not needed
+   *   for slaves and frameworks)
+   */
+  MasterDetector(const string &server, const string &znode,
+		 const PID &_pid, bool contend = false);
+
+  ~MasterDetector();
+
+  /** 
+   * ZooKeeper watcher.
+   */
+  virtual void process(ZooKeeper *zk, int type, int state, const string &path);
+
+  /** 
+   * @return ZooKeeper unique sequence number of the current master.
+   */
+  string getCurrentMasterSeq() const;
+
+  /** 
+   * @return libprocess PID of the current master. 
+   */
+  PID getCurrentMasterPID() const;
+
+  /**
+   * @return Unique ZooKeeper sequence number (only if contending for master, otherwise "").
+   */
+  string getMySeq() const
+  {
+    return mySeq;
+  }
+
+  /**
+   * Adjusts ZooKeepers level of debugging output.
+   * @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
+   */ 
+  static void setQuiet(bool quiet)
+  {
+    zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+  }
+
+private: 
+  void setMySeq(const string &s)
+  {
+    string seq = s;
+    // Converts "/nxmaster/000000131" to "000000131".
+    int pos;
+    if ((pos = seq.find_last_of('/')) != string::npos) {  
+      mySeq = seq.erase(0, pos+1);
+    } else
+      mySeq = "";
+  }
+
+  /*
+  * TODO(alig): Comment this object.
+  */
+  void detectMaster();
+
+  /*
+  * TODO(alig): Comment this object.
+  */
+  PID lookupMasterPID(const string &seq) const;
+
+  string servers;
+  string znode;
+  PID pid;
+  bool contend;
+
+  ZooKeeper *zk;
+
+  // Our sequence number if contending.
+  string mySeq;
+
+  string currentMasterSeq;
+  PID currentMasterPID;
+};
+
+#endif /* __MASTER_DETECTOR_HPP__ */
+

Modified: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131740&r1=1131739&r2=1131740&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun  5 05:24:14 2011
@@ -14,6 +14,7 @@ using std::cout;
 using std::endl;
 using std::map;
 using std::string;
+using std::vector;
 
 
 /* Forward (and first) declaration of ZooKeeperProcess. */
@@ -41,6 +42,7 @@ enum {
   REMOVE, // Perform an asynchronous remove (delete).
   EXISTS, // Perform an asysnchronous exists.
   GET, // Perform an asynchronous get.
+  GET_CHILDREN, // Perform an asynchronous get_children.
   SET, // Perform an asynchronous set.
 };
 
@@ -105,6 +107,18 @@ struct GetCall
 };
 
 
+/* GetChildren "message" for performing ZooKeeper::getChildren. */
+struct GetChildrenCall
+{
+  int ret;
+  const string *path;
+  bool watch;
+  vector<string> *results;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
 /* Set "message" for performing ZooKeeper::set. */
 struct SetCall
 {
@@ -346,6 +360,20 @@ private:
     getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
   }
 
+  void getChildrenCompletion(int ret, const String_vector *results,
+			     const void *data)
+  {
+    GetChildrenCall *getChildrenCall =
+      static_cast<GetChildrenCall *>(const_cast<void *>((data)));
+    getChildrenCall->ret = ret;
+    if (getChildrenCall->results != NULL && results != NULL) {
+      for (int i = 0; i < results->count; i++) {
+	getChildrenCall->results->push_back(results->data[i]);
+      }
+    }
+    getChildrenCall->zooKeeperProcess->send(getChildrenCall->from, COMPLETED);
+  }
+
   static void setCompletion(int ret, const Stat *stat, const void *data)
   {
     SetCall *setCall =
@@ -487,6 +515,21 @@ protected:
 	    }
 	    break;
 	  }
+	  case GET_CHILDREN: {
+	    GetChildrenCall *getChildrenCall =
+	      *reinterpret_cast<GetChildrenCall **>(const_cast<char *>(body(NULL)));
+	    getChildrenCall->from = from();
+	    getChildrenCall->zooKeeperProcess = this;
+	    int ret = zoo_aget_children(zh, getChildrenCall->path->c_str(),
+					getChildrenCall->watch,
+					getChildrenCompletion,
+					getChildrenCall);
+	    if (ret != ZOK) {
+	      getChildrenCall->ret = ret;
+	      send(getChildrenCall->from, COMPLETED);
+	    }
+	    break;
+	  }
 	  case SET: {
 	    SetCall *setCall =
 	      *reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
@@ -722,6 +765,46 @@ int ZooKeeper::get(const string &path,
 }
 
 
+int ZooKeeper::getChildren(const string &path,
+			   bool watch,
+			   vector<string> *results)
+{
+  GetChildrenCall getChildrenCall;
+  getChildrenCall.path = &path;
+  getChildrenCall.watch = watch;
+  getChildrenCall.results = results;
+
+  class GetChildrenCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    GetChildrenCall *getChildrenCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       GET,
+	       reinterpret_cast<char *>(&getChildrenCall),
+	       sizeof(GetCall *)) != COMPLETED)
+	getChildrenCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    GetChildrenCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+			   GetChildrenCall *_getChidlrenCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	getChildrenCall(_getChidlrenCall)
+    {}
+  } getChildrenCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		   &getChildrenCall);
+
+  Process::wait(Process::spawn(&getChildrenCallProcess));
+
+  return getChildrenCall.ret;
+}
+
+
 int ZooKeeper::set(const string &path,
 		   const string &data,
 		   int version)

Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131740&r1=1131739&r2=1131740&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun  5 05:24:14 2011
@@ -19,6 +19,7 @@
 #include <zookeeper.h>
 
 #include <string>
+#include <vector>
 
 
 /* Forward declarations of classes we are using. */
@@ -230,6 +231,26 @@ public:
 	  Stat *stat);
 
   /**
+   * \brief lists the children of a node synchronously.
+   * 
+   * \param path the name of the node. Expressed as a file name with
+   *   slashes separating ancestors of the node.
+   * \param watch if true, a watch will be set at the server to notify
+   *   the client if the node changes.
+   * \param results return value of children paths.
+   * \return the return code of the function.
+   * ZOK operation completed successfully
+   * ZNONODE the node does not exist.
+   * ZNOAUTH the client does not have permission.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int getChildren(const std::string &path,
+		  bool watch,
+		  std::vector<std::string> *results);
+
+  /**
    * \brief sets the data associated with a node.
    * 
    * \param path the name of the node. Expressed as a file name with slashes