You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:46 UTC

[3/4] git commit: Updated master and contender to write znode(s) in the new (labelled) format.

Updated master and contender to write znode(s) in the new (labelled)
format.

Review: https://reviews.apache.org/r/17173


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586e7eb6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586e7eb6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586e7eb6

Branch: refs/heads/master
Commit: 586e7eb65d6d376743725f64a481150171d44916
Parents: 4382a0f
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:12:46 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800

----------------------------------------------------------------------
 src/master/contender.cpp                      | 28 ++++++++-------
 src/master/contender.hpp                      | 12 ++++---
 src/master/master.cpp                         |  3 +-
 src/tests/master_contender_detector_tests.cpp | 41 ++++++++++++++--------
 src/tests/zookeeper_tests.cpp                 | 12 ++++---
 src/zookeeper/contender.cpp                   | 29 +++++++++------
 src/zookeeper/contender.hpp                   |  8 +++--
 7 files changed, 84 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
index 89e368b..e3b0737 100644
--- a/src/master/contender.cpp
+++ b/src/master/contender.cpp
@@ -22,6 +22,7 @@
 #include <stout/check.hpp>
 #include <stout/lambda.hpp>
 
+#include "master/constants.hpp"
 #include "master/contender.hpp"
 #include "master/master.hpp"
 
@@ -54,7 +55,7 @@ public:
   // Explicitely use 'initialize' since we're overloading below.
   using process::ProcessBase::initialize;
 
-  void initialize(const PID<Master>& master);
+  void initialize(const MasterInfo& masterInfo);
 
   // MasterContender implementation.
   virtual Future<Future<Nothing> > contend();
@@ -64,7 +65,7 @@ private:
   LeaderContender* contender;
 
   // The master this contender contends on behalf of.
-  Option<PID<Master> > master;
+  Option<MasterInfo> masterInfo;
   Option<Future<Future<Nothing> > > candidacy;
 };
 
@@ -109,8 +110,7 @@ StandaloneMasterContender::~StandaloneMasterContender()
 }
 
 
-void StandaloneMasterContender::initialize(
-    const PID<master::Master>& master)
+void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
 {
   // We don't really need to store the master in this basic
   // implementation so we just restore an 'initialized' flag to make
@@ -161,10 +161,9 @@ ZooKeeperMasterContender::~ZooKeeperMasterContender()
 }
 
 
-void ZooKeeperMasterContender::initialize(
-    const PID<master::Master>& master)
+void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
 {
-  process->initialize(master);
+  process->initialize(masterInfo);
 }
 
 
@@ -191,16 +190,15 @@ ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
   delete contender;
 }
 
-void ZooKeeperMasterContenderProcess::initialize(
-    const PID<Master>& _master)
+void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
 {
-  master = _master;
+  masterInfo = _masterInfo;
 }
 
 
 Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
 {
-  if (master.isNone()) {
+  if (masterInfo.isNone()) {
     return Failure("Initialize the contender first");
   }
 
@@ -214,7 +212,13 @@ Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
     delete contender;
   }
 
-  contender = new LeaderContender(group.get(), master.get());
+  // Serialize the MasterInfo to string.
+  string data;
+  if (!masterInfo.get().SerializeToString(&data)) {
+    return Failure("Failed to serialize data to MasterInfo");
+  }
+
+  contender = new LeaderContender(group.get(), data, master::MASTER_INFO_LABEL);
   candidacy = contender->contend();
   return candidacy.get();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
index 2a7e7c4..0048ee0 100644
--- a/src/master/contender.hpp
+++ b/src/master/contender.hpp
@@ -27,6 +27,8 @@
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 
+#include "messages/messages.hpp"
+
 #include "zookeeper/contender.hpp"
 #include "zookeeper/group.hpp"
 #include "zookeeper/url.hpp"
@@ -63,9 +65,9 @@ public:
   // to be cancelled during destruction.
   virtual ~MasterContender() = 0;
 
-  // Initializes the contender with the PID of the master it contends
-  // on behalf of.
-  virtual void initialize(const process::PID<master::Master>& master) = 0;
+  // Initializes the contender with the MasterInfo of the master it
+  // contends on behalf of.
+  virtual void initialize(const MasterInfo& masterInfo) = 0;
 
   // Returns a Future<Nothing> once the contender has entered the
   // contest (by obtaining a membership) and an error otherwise.
@@ -94,7 +96,7 @@ public:
   virtual ~StandaloneMasterContender();
 
   // MasterContender implementation.
-  virtual void initialize(const process::PID<master::Master>& master);
+  virtual void initialize(const MasterInfo& masterInfo);
 
   // In this basic implementation the outer Future directly returns
   // and inner Future stays pending because there is only one
@@ -118,7 +120,7 @@ public:
   virtual ~ZooKeeperMasterContender();
 
   // MasterContender implementation.
-  virtual void initialize(const process::PID<master::Master>& master);
+  virtual void initialize(const MasterInfo& masterInfo);
   virtual process::Future<process::Future<Nothing> > contend();
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 008033e..c7d9186 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -294,6 +294,7 @@ void Master::initialize()
   info.set_id(id.get());
   info.set_ip(self().ip);
   info.set_port(self().port);
+  info.set_pid(self());
 
   LOG(INFO) << "Master ID: " << info.id();
 
@@ -543,7 +544,7 @@ void Master::initialize()
     }
   }
 
-  contender->initialize(self());
+  contender->initialize(info);
 
   // Start contending to be a leading master and detecting the current
   // leader.

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index a739f89..5223200 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -80,6 +80,19 @@ using testing::AtMost;
 using testing::Return;
 
 
+// Helper function that creates a MasterInfo from PID<Master>.
+static MasterInfo createMasterInfo(const PID<Master>& master)
+{
+  MasterInfo masterInfo;
+  masterInfo.set_id(UUID::random().toString());
+  masterInfo.set_ip(master.ip);
+  masterInfo.set_port(master.port);
+  masterInfo.set_pid(master);
+
+  return masterInfo;
+}
+
+
 class MasterContenderDetectorTest : public MesosTest {};
 
 
@@ -133,7 +146,7 @@ TEST(BasicMasterContenderDetectorTest, Contender)
 
   MasterContender* contender = new StandaloneMasterContender();
 
-  contender->initialize(master);
+  contender->initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > contended = contender->contend();
   AWAIT_READY(contended);
@@ -190,7 +203,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender->initialize(master);
+  contender->initialize(createMasterInfo(master));
   Future<Future<Nothing> > contended = contender->contend();
   AWAIT_READY(contended);
 
@@ -228,7 +241,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   // Drop Group::join so that 'contended' will stay pending.
   Future<Nothing> join = DROP_DISPATCH(_, &GroupProcess::join);
@@ -281,7 +294,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   master1.ip = 10000000;
   master1.port = 10000;
 
-  contender1->initialize(master1);
+  contender1->initialize(createMasterInfo(master1));
 
   Future<Future<Nothing> > contended1 = contender1->contend();
   AWAIT_READY(contended1);
@@ -298,7 +311,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   master2.ip = 10000001;
   master2.port = 10001;
 
-  contender2.initialize(master2);
+  contender2.initialize(createMasterInfo(master2));
 
   Future<Future<Nothing> > contended2 = contender2.contend();
   AWAIT_READY(contended2);
@@ -343,7 +356,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
       "/mesos",
       zookeeper::Authentication("digest", "member:wrongpass")));
   ZooKeeperMasterContender contender(group2);
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   // Fails due to authentication error.
   AWAIT_FAILED(contender.contend());
@@ -399,7 +412,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > contended = contender.contend();
   AWAIT_READY(contended);
@@ -474,7 +487,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
   leader.ip = 10000000;
   leader.port = 10000;
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > contended = leaderContender.contend();
   AWAIT_READY(contended);
@@ -494,7 +507,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
   follower.ip = 10000001;
   follower.port = 10001;
 
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   contended = followerContender.contend();
   AWAIT_READY(contended);
@@ -586,7 +599,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
 
   ZooKeeperMasterContender leaderContender(group);
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > leaderContended = leaderContender.contend();
   AWAIT_READY(leaderContended);
@@ -610,7 +623,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
 
   ZooKeeperMasterDetector followerDetector(url.get());
   ZooKeeperMasterContender followerContender(url.get());
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   Future<Future<Nothing> > followerContended = followerContender.contend();
   AWAIT_READY(followerContended);
@@ -655,7 +668,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
   master.port = 10000;
 
   ZooKeeperMasterContender masterContender(url.get());
-  masterContender.initialize(master);
+  masterContender.initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > leaderContended = masterContender.contend();
   AWAIT_READY(leaderContended);
@@ -714,7 +727,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   leader.ip = 10000000;
   leader.port = 10000;
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > contended = leaderContender.contend();
   AWAIT_READY(contended);
@@ -733,7 +746,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   follower.ip = 10000001;
   follower.port = 10001;
 
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   contended = followerContender.contend();
   AWAIT_READY(contended);

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index c7fa750..615338a 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -29,6 +29,8 @@
 #include <stout/gtest.hpp>
 #include <stout/strings.hpp>
 
+#include "master/constants.hpp"
+
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/contender.hpp"
 #include "zookeeper/detector.hpp"
@@ -274,7 +276,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
   Group group(server->connectString(), timeout, "/test/");
 
   Owned<LeaderContender> contender(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
 
   // Calling withdraw before contending returns 'false' because there
   // is nothing to withdraw.
@@ -292,7 +294,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Normal workflow.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
 
   Future<Future<Nothing> > candidated = contender->contend();
   AWAIT_READY(candidated);
@@ -320,7 +322,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend again.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
 
   AWAIT_READY(connected);
@@ -344,7 +346,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend (3) and shutdown the network this time.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
   AWAIT_READY(candidated);
   lostCandidacy = candidated.get();
@@ -369,7 +371,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend again (4).
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
   AWAIT_READY(candidated);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index d8a5201..6710da4 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -8,6 +8,7 @@
 #include <stout/check.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/some.hpp>
 
 #include "zookeeper/contender.hpp"
 #include "zookeeper/detector.hpp"
@@ -23,7 +24,11 @@ namespace zookeeper {
 class LeaderContenderProcess : public Process<LeaderContenderProcess>
 {
 public:
-  LeaderContenderProcess(Group* group, const std::string& data);
+  LeaderContenderProcess(
+      Group* group,
+      const string& data,
+      const Option<string>& label);
+
   virtual ~LeaderContenderProcess();
 
   // LeaderContender implementation.
@@ -45,6 +50,7 @@ private:
 
   Group* group;
   const string data;
+  const Option<string> label;
 
   // The contender's state transitions from contending -> watching ->
   // withdrawing or contending -> withdrawing. Each state is
@@ -70,9 +76,9 @@ private:
 
 LeaderContenderProcess::LeaderContenderProcess(
     Group* _group,
-    const string& _data)
-  : group(_group),
-    data(_data) {}
+    const string& _data,
+    const Option<string>& _label)
+  : group(_group), data(_data), label(_label) {}
 
 
 LeaderContenderProcess::~LeaderContenderProcess()
@@ -118,8 +124,8 @@ Future<Future<Nothing> > LeaderContenderProcess::contend()
     return Failure("Cannot contend more than once");
   }
 
-  LOG(INFO) << "Joining the ZK group with data: '" << data << "'";
-  candidacy = group->join(data);
+  LOG(INFO) << "Joining the ZK group";
+  candidacy = group->join(data, label);
   candidacy
     .onAny(defer(self(), &Self::joined));
 
@@ -234,8 +240,8 @@ void LeaderContenderProcess::joined()
     return;
   }
 
-  LOG(INFO) << "New candidate (id='" << candidacy.get().id() << "', data='"
-            << data << "') has entered the contest for leadership";
+  LOG(INFO) << "New candidate (id='" << candidacy.get().id()
+            << "') has entered the contest for leadership";
 
   // Transition to 'watching' state.
   watching = new Promise<Nothing>();
@@ -250,9 +256,12 @@ void LeaderContenderProcess::joined()
 }
 
 
-LeaderContender::LeaderContender(Group* group, const string& data)
+LeaderContender::LeaderContender(
+    Group* group,
+    const string& data,
+    const Option<string>& label)
 {
-  process = new LeaderContenderProcess(group, data);
+  process = new LeaderContenderProcess(group, data, label);
   spawn(process);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.hpp b/src/zookeeper/contender.hpp
index e526e2d..6529245 100644
--- a/src/zookeeper/contender.hpp
+++ b/src/zookeeper/contender.hpp
@@ -6,6 +6,7 @@
 #include <process/future.hpp>
 
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 
 #include "zookeeper/group.hpp"
 
@@ -25,8 +26,11 @@ class LeaderContender
 public:
   // The specified 'group' is expected to outlive the contender. The
   // specified 'data' is associated with the group membership created
-  // by this contender.
-  LeaderContender(Group* group, const std::string& data);
+  // by this contender. 'label' indicates the label for the znode that
+  // stores the 'data'.
+  LeaderContender(Group* group,
+                  const std::string& data,
+                  const Option<std::string>& label);
 
   // Note that the contender's membership, if obtained, is scheduled
   // to be cancelled during destruction.