You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:44 UTC

[1/4] git commit: Fixed MESOS-935: Group should tell MasterDetector "no memberships detected" when it locally times out.

Updated Branches:
  refs/heads/master 7321c385f -> 586e7eb65


Fixed MESOS-935: Group should tell MasterDetector "no memberships
detected" when it locally times out.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/17156


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42946ed5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42946ed5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42946ed5

Branch: refs/heads/master
Commit: 42946ed574e117c51b3f5f0a4be7636b96404d18
Parents: 7321c38
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jan 22 15:38:45 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:10 2014 -0800

----------------------------------------------------------------------
 src/master/detector.hpp                       |   3 +
 src/master/master.hpp                         |   7 +-
 src/tests/master_contender_detector_tests.cpp |  49 +++++----
 src/tests/master_tests.cpp                    | 112 +++++++++++++++++++++
 src/tests/zookeeper_tests.cpp                 |  28 ++++--
 src/zookeeper/detector.hpp                    |   3 +-
 src/zookeeper/group.cpp                       |  16 ++-
 7 files changed, 183 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
index 8df2d16..277c9d9 100644
--- a/src/master/detector.hpp
+++ b/src/master/detector.hpp
@@ -107,6 +107,9 @@ public:
   virtual ~ZooKeeperMasterDetector();
 
   // MasterDetector implementation.
+  // The detector transparently tries to recover from retryable
+  // errors until the group session expires, in which case the Future
+  // returns None.
   virtual process::Future<Option<process::UPID> > detect(
       const Option<process::UPID>& previous = None());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 18a6cc4..99b8181 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -177,6 +177,10 @@ public:
   // Made public for testing purposes.
   void detected(const Future<Option<UPID> >& pid);
 
+  // Invoked when the contender has lost the candidacy.
+  // Made public for testing purposes.
+  void lostCandidacy(const Future<Nothing>& lost);
+
 protected:
   virtual void initialize();
   virtual void finalize();
@@ -201,9 +205,6 @@ protected:
   // Invoked when the contender has entered the contest.
   void contended(const Future<Future<Nothing> >& candidacy);
 
-  // Invoked when the contender has lost the candidacy.
-  void lostCandidacy(const Future<Nothing>& lost);
-
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
   // tasks) and reporting any unused resources to the allocator.

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index 9cd576f..a739f89 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -418,7 +418,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
 
   // We may need to advance multiple times because we could have
   // advanced the clock before the timer in Group starts.
-  while (lostCandidacy.isPending()) {
+  while (lostCandidacy.isPending() || leader.isPending()) {
     Clock::advance(MASTER_CONTENDER_ZK_SESSION_TIMEOUT);
     Clock::settle();
   }
@@ -426,9 +426,11 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
   // Local timeout does not fail the future but rather deems the
   // session has timed out and the candidacy is lost.
   EXPECT_TRUE(lostCandidacy.isReady());
+  EXPECT_NONE(leader.get());
 
-  // Re-contend (and continue detecting).
+  // Re-contend and re-detect.
   contended = contender.contend();
+  leader = detector.detect(leader.get());
 
   // Things will not change until the server restarts.
   Clock::advance(Minutes(1));
@@ -540,19 +542,23 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
 
   Clock::pause();
 
-  // We know when the groups have timed out when the contenders are
-  // informed of the candidacy loss.
   // We may need to advance multiple times because we could have
   // advanced the clock before the timer in Group starts.
-  while (leaderLostCandidacy.isPending() || followerLostCandidacy.isPending()) {
+  while (leaderDetected.isPending() ||
+         followerDetected.isPending() ||
+         nonContenderDetected.isPending() ||
+         leaderLostCandidacy.isPending() ||
+         followerLostCandidacy.isPending()) {
     Clock::advance(sessionTimeout);
     Clock::settle();
   }
 
-  // Detection is not interrupted.
-  EXPECT_TRUE(leaderDetected.isPending());
-  EXPECT_TRUE(followerDetected.isPending());
-  EXPECT_TRUE(nonContenderDetected.isPending());
+  EXPECT_NONE(leaderDetected.get());
+  EXPECT_NONE(followerDetected.get());
+  EXPECT_NONE(nonContenderDetected.get());
+
+  EXPECT_TRUE(leaderLostCandidacy.isReady());
+  EXPECT_TRUE(followerLostCandidacy.isReady());
 
   Clock::resume();
 }
@@ -670,18 +676,18 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
   Future<Option<int64_t> > session = group->session();
   AWAIT_READY(session);
 
-  Future<Nothing> connected = FUTURE_DISPATCH(
-      group->process->self(),
-      &GroupProcess::connected);
-
   server->expireSession(session.get().get());
 
-  // When connected is called, the leader has already expired and
-  // reconnected.
-  AWAIT_READY(connected);
+  // Session expiration causes detector to assume all membership are
+  // lost.
+  AWAIT_READY(detected);
+  EXPECT_NONE(detected.get());
+
+  detected = slaveDetector.detect(detected.get());
 
-  // Still pending because there is no leader change.
-  EXPECT_TRUE(detected.isPending());
+  // The detector is able re-detect the master.
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(master, detected.get());
 }
 
 
@@ -758,9 +764,14 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   server->expireSession(slaveSession.get().get());
   server->expireSession(masterSession.get().get());
 
-  // Wait for session expiration and ensure a new master is detected.
+  // Wait for session expiration and the detector will first receive
+  // a "no master detected" event.
   AWAIT_READY(detected);
+  EXPECT_NONE(detected.get());
 
+  // nonContenderDetector can now re-detect the new master.
+  detected = nonContenderDetector.detect(detected.get());
+  AWAIT_READY(detected);
   EXPECT_SOME_EQ(follower, detected.get());
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d34450b..f1486ce 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -49,6 +49,10 @@
 #include "tests/isolator.hpp"
 #include "tests/mesos.hpp"
 
+#ifdef MESOS_HAS_JAVA
+#include "tests/zookeeper.hpp"
+#endif
+
 using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
@@ -1023,6 +1027,7 @@ TEST_F(MasterTest, MasterLost)
   Shutdown();
 }
 
+
 // Test sends different state than current and expects an update with
 // the current state of task.
 //
@@ -1099,3 +1104,110 @@ TEST_F(MasterTest, ReconcileTaskTest)
 
   Shutdown(); // Must shutdown before 'isolator' gets deallocated.
 }
+
+
+#ifdef MESOS_HAS_JAVA
+class MasterZooKeeperTest : public MesosTest
+{
+public:
+  static void SetUpTestCase()
+  {
+    // Make sure the JVM is created.
+    ZooKeeperTest::SetUpTestCase();
+
+    // Launch the ZooKeeper test server.
+    server = new ZooKeeperTestServer();
+    server->startNetwork();
+
+    Try<zookeeper::URL> parse = zookeeper::URL::parse(
+        "zk://" + server->connectString() + "/znode");
+    ASSERT_SOME(parse);
+
+    url = parse.get();
+  }
+
+  static void TearDownTestCase()
+  {
+    delete server;
+    server = NULL;
+  }
+
+protected:
+  MasterZooKeeperTest() : MesosTest(url) {}
+
+  static ZooKeeperTestServer* server;
+  static Option<zookeeper::URL> url;
+};
+
+
+ZooKeeperTestServer* MasterZooKeeperTest::server = NULL;
+
+
+Option<zookeeper::URL> MasterZooKeeperTest::url;
+
+
+// This test verifies that when the ZooKeeper cluster is lost,
+// master, slave & scheduler all get informed.
+TEST_F(MasterZooKeeperTest, LostZooKeeperCluster)
+{
+  ASSERT_SOME(StartMaster());
+  ASSERT_SOME(StartSlave());
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, stringify(url.get()), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(Return()); // Ignore offers.
+
+  Future<process::Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+  Future<process::Message> slaveRegisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+  driver.start();
+
+  // Wait for the "registered" messages so that we know the master is
+  // detected by everyone.
+  AWAIT_READY(frameworkRegisteredMessage);
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Future<Nothing> schedulerDisconnected;
+  EXPECT_CALL(sched, disconnected(&driver))
+    .WillOnce(FutureSatisfy(&schedulerDisconnected));
+
+  // Need to drop these two dispatches because otherwise the master
+  // will EXIT.
+  Future<Nothing> masterDetected = DROP_DISPATCH(_, &Master::detected);
+  DROP_DISPATCH(_, &Master::lostCandidacy);
+
+  Future<Nothing> slaveDetected = FUTURE_DISPATCH(_, &Slave::detected);
+
+  server->shutdownNetwork();
+
+  Clock::pause();
+
+  while (schedulerDisconnected.isPending() ||
+         masterDetected.isPending() ||
+         slaveDetected.isPending()) {
+    Clock::advance(MASTER_CONTENDER_ZK_SESSION_TIMEOUT);
+    Clock::settle();
+  }
+
+  Clock::resume();
+
+  // Master, slave and scheduler all lose the leading master.
+  AWAIT_READY(schedulerDisconnected);
+  AWAIT_READY(masterDetected);
+  AWAIT_READY(slaveDetected);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+#endif // MESOS_HAS_JAVA

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 656afde..c7fa750 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -203,7 +203,6 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
 
   Future<Group::Membership> membership1 = group.join("member 1");
   AWAIT_READY(membership1);
-  Future<bool> cancelled = membership1.get().cancelled();
 
   Future<Option<Group::Membership> > leader = detector.detect();
 
@@ -218,26 +217,33 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
 
   // We may need to advance multiple times because we could have
   // advanced the clock before the timer in Group starts.
-  while (cancelled.isPending()) {
+  while (leader.isPending()) {
     Clock::advance(timeout);
     Clock::settle();
   }
   Clock::resume();
 
-  // The detect operation times out but the group internally
-  // recreates a new ZooKeeper client and hides the error from the
-  // detector.
-  EXPECT_TRUE(leader.isPending());
+  // The detect operation times out.
+  EXPECT_NONE(leader.get());
+
+  // Re-detect.
+  leader = detector.detect(leader.get());
 
   Future<Nothing> connected = FUTURE_DISPATCH(
       group.process->self(),
       &GroupProcess::connected);
   server->startNetwork();
 
-  // When the service is restored, all sessions/memberships are gone.
   AWAIT_READY(connected);
   AWAIT_READY(leader);
-  EXPECT_TRUE(leader.get().isNone());
+  EXPECT_SOME(leader.get());
+
+  // Wait until the old membership expires on ZK and re-detect.
+  // (Restarting network doesn't delete old ZNode automatically).
+  AWAIT_READY(leader.get().get().cancelled());
+  leader = detector.detect(leader.get());
+  AWAIT_READY(leader);
+  EXPECT_NONE(leader.get());
 
   AWAIT_READY(group.join("member 1"));
 
@@ -246,10 +252,12 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
   EXPECT_SOME(leader.get());
 
   // Cancel the member and join another.
-  AWAIT_READY(group.cancel(leader.get().get()));
+  Future<bool> cancelled = group.cancel(leader.get().get());
+  AWAIT_READY(cancelled);
+  EXPECT_TRUE(cancelled.get());
   leader = detector.detect(leader.get());
   AWAIT_READY(leader);
-  EXPECT_TRUE(leader.get().isNone());
+  EXPECT_NONE(leader.get());
 
   AWAIT_READY(group.join("member 2"));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/zookeeper/detector.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.hpp b/src/zookeeper/detector.hpp
index 00fcf5e..73235c0 100644
--- a/src/zookeeper/detector.hpp
+++ b/src/zookeeper/detector.hpp
@@ -29,7 +29,8 @@ public:
   // A failed future is returned if the detector is unable to detect
   // the leading master due to a non-retryable error.
   // Note that the detector transparently tries to recover from
-  // retryable errors.
+  // retryable errors until the group session expires, in which case
+  // the Future returns None.
   // The future is never discarded unless it stays pending when the
   // detector destructs.
   //

http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index e21dc6f..72ebe69 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -430,7 +430,7 @@ void GroupProcess::timedout(int64_t sessionId)
     // The timer can be reset or replaced and 'zk' can be replaced
     // since this method was dispatched.
     LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
-                     << "(sessionId=" << std::hex << sessionId << ")";
+                 << "(sessionId=" << std::hex << sessionId << ")";
 
     // Locally determine that the current session has expired.
     expired();
@@ -450,7 +450,19 @@ void GroupProcess::expired()
     timer = None();
   }
 
-  // Invalidate the cache.
+  // From the group's local perspective all the memberships are
+  // gone so we need to update the watches.
+  // If the memberships still exist on ZooKeeper, they will be
+  // restored in group after the group reconnects to ZK.
+  // This is a precaution against the possibility that ZK connection
+  // is lost right after we recreate the ZK instance below or the
+  // entire ZK cluster goes down. The outage can last for a long time
+  // but the clients watching the group should be informed sooner.
+  memberships = set<Group::Membership>();
+  update();
+
+  // Invalidate the cache so that we'll sync with ZK after
+  // reconnection.
   memberships = None();
 
   // Set all owned memberships as cancelled.


[2/4] git commit: Improved Group to take label as an option.

Posted by vi...@apache.org.
Improved Group to take label as an option.

Review: https://reviews.apache.org/r/17171


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/326172ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/326172ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/326172ec

Branch: refs/heads/master
Commit: 326172ec4571de0952278f005d6cba64d4dfc120
Parents: 42946ed
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 16:18:32 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800

----------------------------------------------------------------------
 src/tests/group_tests.cpp   | 34 ++++++++++++++
 src/zookeeper/group.cpp     | 99 ++++++++++++++++++++++++++--------------
 src/zookeeper/group.hpp     | 43 ++++++++++++-----
 src/zookeeper/zookeeper.hpp | 29 ++++++------
 4 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index ac1942b..5d4240c 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -372,3 +372,37 @@ TEST_F(GroupTest, RetryableErrors)
   AWAIT_READY(cancellation);
   AWAIT_READY(connected);
 }
+
+
+TEST_F(GroupTest, LabelledGroup)
+{
+  Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+  // Join a group with label.
+  Future<Group::Membership> membership = group.join(
+      "hello world", std::string("testlabel"));
+
+  AWAIT_READY(membership);
+
+  Future<std::set<Group::Membership> > memberships = group.watch();
+
+  AWAIT_READY(memberships);
+  EXPECT_EQ(1u, memberships.get().size());
+  EXPECT_EQ(1u, memberships.get().count(membership.get()));
+
+  Future<std::string> data = group.data(membership.get());
+
+  AWAIT_EXPECT_EQ("hello world", data);
+
+  Future<bool> cancellation = group.cancel(membership.get());
+
+  AWAIT_EXPECT_EQ(true, cancellation);
+
+  memberships = group.watch(memberships.get());
+
+  AWAIT_READY(memberships);
+  EXPECT_EQ(0u, memberships.get().size());
+
+  ASSERT_TRUE(membership.get().cancelled().isReady());
+  ASSERT_TRUE(membership.get().cancelled().get());
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 72ebe69..a50da22 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -13,6 +13,7 @@
 #include <stout/none.hpp>
 #include <stout/numify.hpp>
 #include <stout/os.hpp>
+#include <stout/path.hpp>
 #include <stout/result.hpp>
 #include <stout/some.hpp>
 #include <stout/strings.hpp>
@@ -127,12 +128,14 @@ void GroupProcess::initialize()
 }
 
 
-Future<Group::Membership> GroupProcess::join(const string& data)
+Future<Group::Membership> GroupProcess::join(
+    const string& data,
+    const Option<string>& label)
 {
   if (error.isSome()) {
     return Failure(error.get());
   } else if (state != READY) {
-    Join* join = new Join(data);
+    Join* join = new Join(data, label);
     pending.joins.push(join);
     return join->promise.future();
   }
@@ -145,14 +148,14 @@ Future<Group::Membership> GroupProcess::join(const string& data)
   // client can assume a happens-before ordering of operations (i.e.,
   // the first request will happen before the second, etc).
 
-  Result<Group::Membership> membership = doJoin(data);
+  Result<Group::Membership> membership = doJoin(data, label);
 
   if (membership.isNone()) { // Try again later.
     if (!retrying) {
       delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
       retrying = true;
     }
-    Join* join = new Join(data);
+    Join* join = new Join(data, label);
     pending.joins.push(join);
     return join->promise.future();
   } else if (membership.isError()) {
@@ -526,7 +529,9 @@ void GroupProcess::deleted(const string& path)
 }
 
 
-Result<Group::Membership> GroupProcess::doJoin(const string& data)
+Result<Group::Membership> GroupProcess::doJoin(
+    const string& data,
+    const Option<string>& label)
 {
   CHECK_EQ(state, READY);
 
@@ -534,8 +539,12 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
   // the specified data as it's contents.
   string result;
 
-  int code = zk->create(znode + "/", data, acl,
-                        ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+  int code = zk->create(
+      znode + "/" + (label.isSome() ? (label.get() + "_") : ""),
+      data,
+      acl,
+      ZOO_SEQUENCE | ZOO_EPHEMERAL,
+      &result);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
     CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
@@ -551,19 +560,24 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
   memberships = None();
 
   // Save the sequence number but only grab the basename. Example:
-  // "/path/to/znode/0000000131" => "0000000131".
+  // "/path/to/znode/label_0000000131" => "0000000131".
   Try<string> basename = os::basename(result);
   if (basename.isError()) {
     return Error("Failed to get the sequence number: " + basename.error());
   }
 
-  Try<int32_t> sequence = numify<int32_t>(basename.get());
+  // Strip the label before grabbing the sequence number.
+  string node = label.isSome()
+      ? strings::remove(basename.get(), label.get() + "_")
+      : basename.get();
+
+  Try<int32_t> sequence = numify<int32_t>(node);
   CHECK_SOME(sequence);
 
   Promise<bool>* cancelled = new Promise<bool>();
   owned[sequence.get()] = cancelled;
 
-  return Group::Membership(sequence.get(), cancelled->future());
+  return Group::Membership(sequence.get(), label, cancelled->future());
 }
 
 
@@ -571,11 +585,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 {
   CHECK_EQ(state, READY);
 
-  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
-  CHECK_SOME(sequence);
-
-  string path = znode + "/" + sequence.get();
+  string path = path::join(znode, zkBasename(membership));
 
   LOG(INFO) << "Trying to remove '" << path << "' in ZooKeeper";
 
@@ -614,11 +624,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
 {
   CHECK_EQ(state, READY);
 
-  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
-  CHECK_SOME(sequence);
-
-  string path = znode + "/" + sequence.get();
+  string path = path::join(znode, zkBasename(membership));
 
   LOG(INFO) << "Trying to get '" << path << "' in ZooKeeper";
 
@@ -654,15 +660,21 @@ Try<bool> GroupProcess::cache()
     CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return false;
   } else if (code != ZOK) {
-    return Error("Non-retryable error attempting to get children of '" + znode +
-                 "' in ZooKeeper: " + zk->message(code));
+    return Error("Non-retryable error attempting to get children of '" + znode
+                 + "' in ZooKeeper: " + zk->message(code));
   }
 
-  // Convert results to sequence numbers.
-  set<int32_t> sequences;
+  // Convert results to sequence numbers and (optionally) labels.
+  hashmap<int32_t, Option<string> > sequences;
 
   foreach (const string& result, results) {
-    Try<int32_t> sequence = numify<int32_t>(result);
+    vector<string> tokens = strings::tokenize(result, "_");
+    Option<string> label = None();
+    if (tokens.size() > 1) {
+      label = tokens[0];
+    }
+
+    Try<int32_t> sequence = numify<int32_t>(tokens.back());
 
     // Skip it if it couldn't be converted to a number.
     if (sequence.isError()) {
@@ -671,39 +683,43 @@ Try<bool> GroupProcess::cache()
       continue;
     }
 
-    sequences.insert(sequence.get());
+    sequences[sequence.get()] = label;
   }
 
   // Cache current memberships, cancelling those that are now missing.
   set<Group::Membership> current;
 
   foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
-    if (sequences.count(sequence) == 0) {
+    if (!sequences.contains(sequence)) {
       cancelled->set(false);
       owned.erase(sequence); // Okay since iterating over a copy.
       delete cancelled;
     } else {
-      current.insert(Group::Membership(sequence, cancelled->future()));
+      current.insert(Group::Membership(
+          sequence, sequences[sequence], cancelled->future()));
+
       sequences.erase(sequence);
     }
   }
 
   foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
-    if (sequences.count(sequence) == 0) {
+    if (!sequences.contains(sequence)) {
       cancelled->set(false);
       unowned.erase(sequence); // Okay since iterating over a copy.
       delete cancelled;
     } else {
-      current.insert(Group::Membership(sequence, cancelled->future()));
+      current.insert(Group::Membership(
+          sequence, sequences[sequence], cancelled->future()));
+
       sequences.erase(sequence);
     }
   }
 
   // Add any remaining (i.e., unexpected) sequences.
-  foreach (int32_t sequence, sequences) {
+  foreachpair (int32_t sequence, const Option<string>& label, sequences) {
     Promise<bool>* cancelled = new Promise<bool>();
     unowned[sequence] = cancelled;
-    current.insert(Group::Membership(sequence, cancelled->future()));
+    current.insert(Group::Membership(sequence, label, cancelled->future()));
   }
 
   memberships = current;
@@ -759,7 +775,7 @@ Try<bool> GroupProcess::sync()
   // Do joins.
   while (!pending.joins.empty()) {
     Join* join = pending.joins.front();
-    Result<Group::Membership> membership = doJoin(join->data);
+    Result<Group::Membership> membership = doJoin(join->data, join->label);
     if (membership.isNone()) {
       return false; // Try again later.
     } else if (membership.isError()) {
@@ -877,6 +893,17 @@ void GroupProcess::abort(const string& message)
 }
 
 
+string GroupProcess::zkBasename(const Group::Membership& membership)
+{
+  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
+  CHECK_SOME(sequence);
+
+  return membership.label.isSome()
+      ? (membership.label.get() + "_" + sequence.get())
+      : sequence.get();
+}
+
+
 Group::Group(const string& servers,
              const Duration& timeout,
              const string& znode,
@@ -903,9 +930,11 @@ Group::~Group()
 }
 
 
-Future<Group::Membership> Group::join(const string& data)
+Future<Group::Membership> Group::join(
+    const string& data,
+    const Option<string>& label)
 {
-  return dispatch(process, &GroupProcess::join, data);
+  return dispatch(process, &GroupProcess::join, data, label);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 354229f..1ce1519 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -3,13 +3,16 @@
 
 #include <map>
 #include <set>
+#include <string>
 
 #include "process/future.hpp"
 #include "process/timer.hpp"
 
+#include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
+#include <stout/try.hpp>
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/url.hpp"
@@ -85,10 +88,13 @@ public:
   private:
     friend class GroupProcess; // Creates and manages memberships.
 
-    Membership(int32_t _sequence, const process::Future<bool>& cancelled)
-      : sequence(_sequence), cancelled_(cancelled) {}
+    Membership(int32_t _sequence,
+               const Option<std::string>& _label,
+               const process::Future<bool>& cancelled)
+      : sequence(_sequence), label(_label), cancelled_(cancelled) {}
 
     const int32_t sequence;
+    const Option<std::string> label;
     process::Future<bool> cancelled_;
   };
 
@@ -103,13 +109,16 @@ public:
 
   ~Group();
 
-  // Returns the result of trying to join a "group" in ZooKeeper. If
-  // succesful, an "owned" membership will be returned whose
-  // retrievable data will be a copy of the specified parameter. A
-  // membership is not "renewed" in the event of a ZooKeeper session
-  // expiration. Instead, a client should watch the group memberships
-  // and rejoin the group as appropriate.
-  process::Future<Membership> join(const std::string& data);
+  // Returns the result of trying to join a "group" in ZooKeeper.
+  // If "label" is provided the newly created znode contains "label_"
+  // as the prefix. If join is successful, an "owned" membership will
+  // be returned whose retrievable data will be a copy of the
+  // specified parameter. A membership is not "renewed" in the event
+  // of a ZooKeeper session expiration. Instead, a client should watch
+  // the group memberships and rejoin the group as appropriate.
+  process::Future<Membership> join(
+      const std::string& data,
+      const Option<std::string>& label = None());
 
   // Returns the result of trying to cancel a membership. Note that
   // only memberships that are "owned" (see join) can be canceled.
@@ -150,8 +159,14 @@ public:
 
   static const Duration RETRY_INTERVAL;
 
+  // Helper function that returns the basename of the znode of
+  // the membership.
+  static std::string zkBasename(const Group::Membership& membership);
+
   // Group implementation.
-  process::Future<Group::Membership> join(const std::string& data);
+  process::Future<Group::Membership> join(
+      const std::string& data,
+      const Option<std::string>& label);
   process::Future<bool> cancel(const Group::Membership& membership);
   process::Future<std::string> data(const Group::Membership& membership);
   process::Future<std::set<Group::Membership> > watch(
@@ -167,7 +182,9 @@ public:
   void deleted(const std::string& path);
 
 private:
-  Result<Group::Membership> doJoin(const std::string& data);
+  Result<Group::Membership> doJoin(
+      const std::string& data,
+      const Option<std::string>& label);
   Result<bool> doCancel(const Group::Membership& membership);
   Result<std::string> doData(const Group::Membership& membership);
 
@@ -234,8 +251,10 @@ private:
 
   struct Join
   {
-    Join(const std::string& _data) : data(_data) {}
+    Join(const std::string& _data, const Option<std::string>& _label)
+      : data(_data), label(_label) {}
     std::string data;
+    const Option<std::string> label;
     process::Promise<Group::Membership> promise;
   };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 1b4e2ed..f50aca6 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -187,12 +187,13 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int create(const std::string &path,
-	     const std::string &data,
-	     const ACL_vector &acl,
-	     int flags,
-	     std::string *result,
-	     bool recursive = false);
+  int create(
+      const std::string &path,
+      const std::string &data,
+      const ACL_vector &acl,
+      int flags,
+      std::string *result,
+      bool recursive = false);
 
   /**
    * \brief delete a node in zookeeper synchronously.
@@ -253,10 +254,11 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int get(const std::string &path,
-	  bool watch,
-	  std::string *result,
-	  Stat *stat);
+  int get(
+      const std::string &path,
+      bool watch,
+      std::string *result,
+      Stat *stat);
 
   /**
    * \brief lists the children of a node synchronously.
@@ -274,9 +276,10 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int getChildren(const std::string &path,
-		  bool watch,
-		  std::vector<std::string> *results);
+  int getChildren(
+      const std::string &path,
+      bool watch,
+      std::vector<std::string> *results);
 
   /**
    * \brief sets the data associated with a node.


[3/4] git commit: Updated master and contender to write znode(s) in the new (labelled) format.

Posted by vi...@apache.org.
Updated master and contender to write znode(s) in the new (labelled)
format.

Review: https://reviews.apache.org/r/17173


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586e7eb6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586e7eb6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586e7eb6

Branch: refs/heads/master
Commit: 586e7eb65d6d376743725f64a481150171d44916
Parents: 4382a0f
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:12:46 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800

----------------------------------------------------------------------
 src/master/contender.cpp                      | 28 ++++++++-------
 src/master/contender.hpp                      | 12 ++++---
 src/master/master.cpp                         |  3 +-
 src/tests/master_contender_detector_tests.cpp | 41 ++++++++++++++--------
 src/tests/zookeeper_tests.cpp                 | 12 ++++---
 src/zookeeper/contender.cpp                   | 29 +++++++++------
 src/zookeeper/contender.hpp                   |  8 +++--
 7 files changed, 84 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
index 89e368b..e3b0737 100644
--- a/src/master/contender.cpp
+++ b/src/master/contender.cpp
@@ -22,6 +22,7 @@
 #include <stout/check.hpp>
 #include <stout/lambda.hpp>
 
+#include "master/constants.hpp"
 #include "master/contender.hpp"
 #include "master/master.hpp"
 
@@ -54,7 +55,7 @@ public:
   // Explicitely use 'initialize' since we're overloading below.
   using process::ProcessBase::initialize;
 
-  void initialize(const PID<Master>& master);
+  void initialize(const MasterInfo& masterInfo);
 
   // MasterContender implementation.
   virtual Future<Future<Nothing> > contend();
@@ -64,7 +65,7 @@ private:
   LeaderContender* contender;
 
   // The master this contender contends on behalf of.
-  Option<PID<Master> > master;
+  Option<MasterInfo> masterInfo;
   Option<Future<Future<Nothing> > > candidacy;
 };
 
@@ -109,8 +110,7 @@ StandaloneMasterContender::~StandaloneMasterContender()
 }
 
 
-void StandaloneMasterContender::initialize(
-    const PID<master::Master>& master)
+void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
 {
   // We don't really need to store the master in this basic
   // implementation so we just restore an 'initialized' flag to make
@@ -161,10 +161,9 @@ ZooKeeperMasterContender::~ZooKeeperMasterContender()
 }
 
 
-void ZooKeeperMasterContender::initialize(
-    const PID<master::Master>& master)
+void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
 {
-  process->initialize(master);
+  process->initialize(masterInfo);
 }
 
 
@@ -191,16 +190,15 @@ ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
   delete contender;
 }
 
-void ZooKeeperMasterContenderProcess::initialize(
-    const PID<Master>& _master)
+void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
 {
-  master = _master;
+  masterInfo = _masterInfo;
 }
 
 
 Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
 {
-  if (master.isNone()) {
+  if (masterInfo.isNone()) {
     return Failure("Initialize the contender first");
   }
 
@@ -214,7 +212,13 @@ Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
     delete contender;
   }
 
-  contender = new LeaderContender(group.get(), master.get());
+  // Serialize the MasterInfo to string.
+  string data;
+  if (!masterInfo.get().SerializeToString(&data)) {
+    return Failure("Failed to serialize data to MasterInfo");
+  }
+
+  contender = new LeaderContender(group.get(), data, master::MASTER_INFO_LABEL);
   candidacy = contender->contend();
   return candidacy.get();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
index 2a7e7c4..0048ee0 100644
--- a/src/master/contender.hpp
+++ b/src/master/contender.hpp
@@ -27,6 +27,8 @@
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 
+#include "messages/messages.hpp"
+
 #include "zookeeper/contender.hpp"
 #include "zookeeper/group.hpp"
 #include "zookeeper/url.hpp"
@@ -63,9 +65,9 @@ public:
   // to be cancelled during destruction.
   virtual ~MasterContender() = 0;
 
-  // Initializes the contender with the PID of the master it contends
-  // on behalf of.
-  virtual void initialize(const process::PID<master::Master>& master) = 0;
+  // Initializes the contender with the MasterInfo of the master it
+  // contends on behalf of.
+  virtual void initialize(const MasterInfo& masterInfo) = 0;
 
   // Returns a Future<Nothing> once the contender has entered the
   // contest (by obtaining a membership) and an error otherwise.
@@ -94,7 +96,7 @@ public:
   virtual ~StandaloneMasterContender();
 
   // MasterContender implementation.
-  virtual void initialize(const process::PID<master::Master>& master);
+  virtual void initialize(const MasterInfo& masterInfo);
 
   // In this basic implementation the outer Future directly returns
   // and inner Future stays pending because there is only one
@@ -118,7 +120,7 @@ public:
   virtual ~ZooKeeperMasterContender();
 
   // MasterContender implementation.
-  virtual void initialize(const process::PID<master::Master>& master);
+  virtual void initialize(const MasterInfo& masterInfo);
   virtual process::Future<process::Future<Nothing> > contend();
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 008033e..c7d9186 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -294,6 +294,7 @@ void Master::initialize()
   info.set_id(id.get());
   info.set_ip(self().ip);
   info.set_port(self().port);
+  info.set_pid(self());
 
   LOG(INFO) << "Master ID: " << info.id();
 
@@ -543,7 +544,7 @@ void Master::initialize()
     }
   }
 
-  contender->initialize(self());
+  contender->initialize(info);
 
   // Start contending to be a leading master and detecting the current
   // leader.

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index a739f89..5223200 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -80,6 +80,19 @@ using testing::AtMost;
 using testing::Return;
 
 
+// Helper function that creates a MasterInfo from PID<Master>.
+static MasterInfo createMasterInfo(const PID<Master>& master)
+{
+  MasterInfo masterInfo;
+  masterInfo.set_id(UUID::random().toString());
+  masterInfo.set_ip(master.ip);
+  masterInfo.set_port(master.port);
+  masterInfo.set_pid(master);
+
+  return masterInfo;
+}
+
+
 class MasterContenderDetectorTest : public MesosTest {};
 
 
@@ -133,7 +146,7 @@ TEST(BasicMasterContenderDetectorTest, Contender)
 
   MasterContender* contender = new StandaloneMasterContender();
 
-  contender->initialize(master);
+  contender->initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > contended = contender->contend();
   AWAIT_READY(contended);
@@ -190,7 +203,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender->initialize(master);
+  contender->initialize(createMasterInfo(master));
   Future<Future<Nothing> > contended = contender->contend();
   AWAIT_READY(contended);
 
@@ -228,7 +241,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   // Drop Group::join so that 'contended' will stay pending.
   Future<Nothing> join = DROP_DISPATCH(_, &GroupProcess::join);
@@ -281,7 +294,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   master1.ip = 10000000;
   master1.port = 10000;
 
-  contender1->initialize(master1);
+  contender1->initialize(createMasterInfo(master1));
 
   Future<Future<Nothing> > contended1 = contender1->contend();
   AWAIT_READY(contended1);
@@ -298,7 +311,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   master2.ip = 10000001;
   master2.port = 10001;
 
-  contender2.initialize(master2);
+  contender2.initialize(createMasterInfo(master2));
 
   Future<Future<Nothing> > contended2 = contender2.contend();
   AWAIT_READY(contended2);
@@ -343,7 +356,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
       "/mesos",
       zookeeper::Authentication("digest", "member:wrongpass")));
   ZooKeeperMasterContender contender(group2);
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   // Fails due to authentication error.
   AWAIT_FAILED(contender.contend());
@@ -399,7 +412,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
   master.ip = 10000000;
   master.port = 10000;
 
-  contender.initialize(master);
+  contender.initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > contended = contender.contend();
   AWAIT_READY(contended);
@@ -474,7 +487,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
   leader.ip = 10000000;
   leader.port = 10000;
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > contended = leaderContender.contend();
   AWAIT_READY(contended);
@@ -494,7 +507,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
   follower.ip = 10000001;
   follower.port = 10001;
 
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   contended = followerContender.contend();
   AWAIT_READY(contended);
@@ -586,7 +599,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
 
   ZooKeeperMasterContender leaderContender(group);
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > leaderContended = leaderContender.contend();
   AWAIT_READY(leaderContended);
@@ -610,7 +623,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
 
   ZooKeeperMasterDetector followerDetector(url.get());
   ZooKeeperMasterContender followerContender(url.get());
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   Future<Future<Nothing> > followerContended = followerContender.contend();
   AWAIT_READY(followerContended);
@@ -655,7 +668,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
   master.port = 10000;
 
   ZooKeeperMasterContender masterContender(url.get());
-  masterContender.initialize(master);
+  masterContender.initialize(createMasterInfo(master));
 
   Future<Future<Nothing> > leaderContended = masterContender.contend();
   AWAIT_READY(leaderContended);
@@ -714,7 +727,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   leader.ip = 10000000;
   leader.port = 10000;
 
-  leaderContender.initialize(leader);
+  leaderContender.initialize(createMasterInfo(leader));
 
   Future<Future<Nothing> > contended = leaderContender.contend();
   AWAIT_READY(contended);
@@ -733,7 +746,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   follower.ip = 10000001;
   follower.port = 10001;
 
-  followerContender.initialize(follower);
+  followerContender.initialize(createMasterInfo(follower));
 
   contended = followerContender.contend();
   AWAIT_READY(contended);

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index c7fa750..615338a 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -29,6 +29,8 @@
 #include <stout/gtest.hpp>
 #include <stout/strings.hpp>
 
+#include "master/constants.hpp"
+
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/contender.hpp"
 #include "zookeeper/detector.hpp"
@@ -274,7 +276,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
   Group group(server->connectString(), timeout, "/test/");
 
   Owned<LeaderContender> contender(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
 
   // Calling withdraw before contending returns 'false' because there
   // is nothing to withdraw.
@@ -292,7 +294,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Normal workflow.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
 
   Future<Future<Nothing> > candidated = contender->contend();
   AWAIT_READY(candidated);
@@ -320,7 +322,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend again.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
 
   AWAIT_READY(connected);
@@ -344,7 +346,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend (3) and shutdown the network this time.
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
   AWAIT_READY(candidated);
   lostCandidacy = candidated.get();
@@ -369,7 +371,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
 
   // Contend again (4).
   contender = Owned<LeaderContender>(
-      new LeaderContender(&group, "candidate 1"));
+      new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
   candidated = contender->contend();
   AWAIT_READY(candidated);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index d8a5201..6710da4 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -8,6 +8,7 @@
 #include <stout/check.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/some.hpp>
 
 #include "zookeeper/contender.hpp"
 #include "zookeeper/detector.hpp"
@@ -23,7 +24,11 @@ namespace zookeeper {
 class LeaderContenderProcess : public Process<LeaderContenderProcess>
 {
 public:
-  LeaderContenderProcess(Group* group, const std::string& data);
+  LeaderContenderProcess(
+      Group* group,
+      const string& data,
+      const Option<string>& label);
+
   virtual ~LeaderContenderProcess();
 
   // LeaderContender implementation.
@@ -45,6 +50,7 @@ private:
 
   Group* group;
   const string data;
+  const Option<string> label;
 
   // The contender's state transitions from contending -> watching ->
   // withdrawing or contending -> withdrawing. Each state is
@@ -70,9 +76,9 @@ private:
 
 LeaderContenderProcess::LeaderContenderProcess(
     Group* _group,
-    const string& _data)
-  : group(_group),
-    data(_data) {}
+    const string& _data,
+    const Option<string>& _label)
+  : group(_group), data(_data), label(_label) {}
 
 
 LeaderContenderProcess::~LeaderContenderProcess()
@@ -118,8 +124,8 @@ Future<Future<Nothing> > LeaderContenderProcess::contend()
     return Failure("Cannot contend more than once");
   }
 
-  LOG(INFO) << "Joining the ZK group with data: '" << data << "'";
-  candidacy = group->join(data);
+  LOG(INFO) << "Joining the ZK group";
+  candidacy = group->join(data, label);
   candidacy
     .onAny(defer(self(), &Self::joined));
 
@@ -234,8 +240,8 @@ void LeaderContenderProcess::joined()
     return;
   }
 
-  LOG(INFO) << "New candidate (id='" << candidacy.get().id() << "', data='"
-            << data << "') has entered the contest for leadership";
+  LOG(INFO) << "New candidate (id='" << candidacy.get().id()
+            << "') has entered the contest for leadership";
 
   // Transition to 'watching' state.
   watching = new Promise<Nothing>();
@@ -250,9 +256,12 @@ void LeaderContenderProcess::joined()
 }
 
 
-LeaderContender::LeaderContender(Group* group, const string& data)
+LeaderContender::LeaderContender(
+    Group* group,
+    const string& data,
+    const Option<string>& label)
 {
-  process = new LeaderContenderProcess(group, data);
+  process = new LeaderContenderProcess(group, data, label);
   spawn(process);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.hpp b/src/zookeeper/contender.hpp
index e526e2d..6529245 100644
--- a/src/zookeeper/contender.hpp
+++ b/src/zookeeper/contender.hpp
@@ -6,6 +6,7 @@
 #include <process/future.hpp>
 
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 
 #include "zookeeper/group.hpp"
 
@@ -25,8 +26,11 @@ class LeaderContender
 public:
   // The specified 'group' is expected to outlive the contender. The
   // specified 'data' is associated with the group membership created
-  // by this contender.
-  LeaderContender(Group* group, const std::string& data);
+  // by this contender. 'label' indicates the label for the znode that
+  // stores the 'data'.
+  LeaderContender(Group* group,
+                  const std::string& data,
+                  const Option<std::string>& label);
 
   // Note that the contender's membership, if obtained, is scheduled
   // to be cancelled during destruction.


[4/4] git commit: Updated detector to understand old (unlabelled) and new (labelled) master znode formats.

Posted by vi...@apache.org.
Updated detector to understand old (unlabelled) and new (labelled)
master znode formats.

Review: https://reviews.apache.org/r/17172


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4382a0f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4382a0f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4382a0f6

Branch: refs/heads/master
Commit: 4382a0f63c29145bc540e6abb1f9f289fb8b1311
Parents: 326172e
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:11:25 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto  |  1 +
 src/master/constants.cpp   |  3 +++
 src/master/constants.hpp   |  5 +++++
 src/master/detector.cpp    | 41 +++++++++++++++++++++++++++++++++++------
 src/zookeeper/detector.cpp |  2 +-
 src/zookeeper/group.cpp    |  4 ++--
 src/zookeeper/group.hpp    |  9 +++++++--
 7 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 655f867..1503e73 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -160,6 +160,7 @@ message MasterInfo {
   required string id = 1;
   required uint32 ip = 2;
   required uint32 port = 3 [default = 5050];
+  optional string pid = 4;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.cpp
----------------------------------------------------------------------
diff --git a/src/master/constants.cpp b/src/master/constants.cpp
index 0b7c9f7..8a48bbb 100644
--- a/src/master/constants.cpp
+++ b/src/master/constants.cpp
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include <string>
+
 #include <stout/bytes.hpp>
 
 #include "master/constants.hpp"
@@ -33,6 +35,7 @@ const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
 const uint32_t MAX_COMPLETED_TASKS_PER_FRAMEWORK = 1000;
 const Duration WHITELIST_WATCH_INTERVAL = Seconds(5);
 const uint32_t TASK_LIMIT = 100;
+const std::string MASTER_INFO_LABEL = "info";
 
 } // namespace mesos {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 8498c9b..cdaaad0 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -21,6 +21,8 @@
 
 #include <stdint.h>
 
+#include <string>
+
 #include <stout/duration.hpp>
 
 namespace mesos {
@@ -65,6 +67,9 @@ extern const Duration WHITELIST_WATCH_INTERVAL;
 // Default number of tasks (limit) for /master/tasks.json endpoint
 extern const uint32_t TASK_LIMIT;
 
+// Label used by the Leader Contender and Detector.
+extern const std::string MASTER_INFO_LABEL;
+
 } // namespace mesos {
 } // namespace internal {
 } // namespace master {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index cf337cf..2b169c5 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -29,8 +29,11 @@
 #include <stout/foreach.hpp>
 #include <stout/lambda.hpp>
 
+#include "master/constants.hpp"
 #include "master/detector.hpp"
 
+#include "messages/messages.hpp"
+
 #include "zookeeper/detector.hpp"
 #include "zookeeper/group.hpp"
 #include "zookeeper/url.hpp"
@@ -46,7 +49,6 @@ namespace internal {
 
 const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
 
-
 class StandaloneMasterDetectorProcess
   : public Process<StandaloneMasterDetectorProcess>
 {
@@ -81,7 +83,7 @@ private:
   void detected(const Future<Option<Group::Membership> >& leader);
 
   // Invoked when we have fetched the data associated with the leader.
-  void fetched(const Future<string>& data);
+  void fetched(const Group::Membership& membership, const Future<string>& data);
 
   Owned<Group> group;
   LeaderDetector detector;
@@ -294,7 +296,7 @@ void ZooKeeperMasterDetectorProcess::detected(
   } else {
     // Fetch the data associated with the leader.
     group->data(_leader.get().get())
-      .onAny(defer(self(), &Self::fetched, lambda::_1));
+      .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
   }
 
   // Keep trying to detect leadership changes.
@@ -303,7 +305,9 @@ void ZooKeeperMasterDetectorProcess::detected(
 }
 
 
-void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+void ZooKeeperMasterDetectorProcess::fetched(
+    const Group::Membership& membership,
+    const Future<string>& data)
 {
   CHECK(!data.isDiscarded());
 
@@ -317,8 +321,33 @@ void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
     return;
   }
 
-  // Cache the master for subsequent requests.
-  leader = UPID(data.get());
+  // Parse the data based on the membership label and cache the
+  // leader for subsequent requests.
+  Option<string> label = membership.label();
+  if (label.isNone()) {
+    leader = UPID(data.get());
+  } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
+    MasterInfo info;
+    if (!info.ParseFromString(data.get())) {
+      leader = None();
+      foreach (Promise<Option<UPID> >* promise, promises) {
+        promise->fail("Failed to parse data into MasterInfo");
+        delete promise;
+      }
+      promises.clear();
+      return;
+    }
+    leader = UPID(info.pid());
+  } else {
+    leader = None();
+    foreach (Promise<Option<UPID> >* promise, promises) {
+      promise->fail("Failed to parse data of unknown label " + label.get());
+      delete promise;
+    }
+    promises.clear();
+    return;
+  }
+
   LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
 
   foreach (Promise<Option<UPID> >* promise, promises) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
index 2759a2f..e186e51 100644
--- a/src/zookeeper/detector.cpp
+++ b/src/zookeeper/detector.cpp
@@ -133,7 +133,7 @@ void LeaderDetectorProcess::watched(const Future<set<Group::Membership> >& membe
   if (current != leader) {
     LOG(INFO) << "Detected a new leader: "
               << (current.isSome()
-                  ? "'(id='" + stringify(current.get().id()) + "')"
+                  ? "(id='" + stringify(current.get().id()) + "')"
                   : "None");
 
     foreach (Promise<Option<Group::Membership> >* promise, promises) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index a50da22..ecb6c00 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -898,8 +898,8 @@ string GroupProcess::zkBasename(const Group::Membership& membership)
   Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
   CHECK_SOME(sequence);
 
-  return membership.label.isSome()
-      ? (membership.label.get() + "_" + sequence.get())
+  return membership.label_.isSome()
+      ? (membership.label_.get() + "_" + sequence.get())
       : sequence.get();
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 1ce1519..e51ebb2 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -74,6 +74,11 @@ public:
       return sequence;
     }
 
+    Option<std::string> label() const
+    {
+      return label_;
+    }
+
     // Returns a future that is only satisfied once this membership
     // has been cancelled. In which case, the value of the future is
     // true if you own this membership and cancelled it by invoking
@@ -91,10 +96,10 @@ public:
     Membership(int32_t _sequence,
                const Option<std::string>& _label,
                const process::Future<bool>& cancelled)
-      : sequence(_sequence), label(_label), cancelled_(cancelled) {}
+      : sequence(_sequence), label_(_label), cancelled_(cancelled) {}
 
     const int32_t sequence;
-    const Option<std::string> label;
+    const Option<std::string> label_;
     process::Future<bool> cancelled_;
   };