You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:47 UTC
[4/4] git commit: Updated detector to understand old (unlabelled) and
new (labelled) master znode formats.
Updated detector to understand old (unlabelled) and new (labelled)
master znode formats.
Review: https://reviews.apache.org/r/17172
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4382a0f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4382a0f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4382a0f6
Branch: refs/heads/master
Commit: 4382a0f63c29145bc540e6abb1f9f289fb8b1311
Parents: 326172e
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:11:25 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800
----------------------------------------------------------------------
include/mesos/mesos.proto | 1 +
src/master/constants.cpp | 3 +++
src/master/constants.hpp | 5 +++++
src/master/detector.cpp | 41 +++++++++++++++++++++++++++++++++++------
src/zookeeper/detector.cpp | 2 +-
src/zookeeper/group.cpp | 4 ++--
src/zookeeper/group.hpp | 9 +++++++--
7 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 655f867..1503e73 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -160,6 +160,7 @@ message MasterInfo {
required string id = 1;
required uint32 ip = 2;
required uint32 port = 3 [default = 5050];
+ optional string pid = 4;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.cpp
----------------------------------------------------------------------
diff --git a/src/master/constants.cpp b/src/master/constants.cpp
index 0b7c9f7..8a48bbb 100644
--- a/src/master/constants.cpp
+++ b/src/master/constants.cpp
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <string>
+
#include <stout/bytes.hpp>
#include "master/constants.hpp"
@@ -33,6 +35,7 @@ const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
const uint32_t MAX_COMPLETED_TASKS_PER_FRAMEWORK = 1000;
const Duration WHITELIST_WATCH_INTERVAL = Seconds(5);
const uint32_t TASK_LIMIT = 100;
+const std::string MASTER_INFO_LABEL = "info";
} // namespace mesos {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 8498c9b..cdaaad0 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -21,6 +21,8 @@
#include <stdint.h>
+#include <string>
+
#include <stout/duration.hpp>
namespace mesos {
@@ -65,6 +67,9 @@ extern const Duration WHITELIST_WATCH_INTERVAL;
// Default number of tasks (limit) for /master/tasks.json endpoint
extern const uint32_t TASK_LIMIT;
+// Label used by the Leader Contender and Detector.
+extern const std::string MASTER_INFO_LABEL;
+
} // namespace mesos {
} // namespace internal {
} // namespace master {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index cf337cf..2b169c5 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -29,8 +29,11 @@
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
+#include "master/constants.hpp"
#include "master/detector.hpp"
+#include "messages/messages.hpp"
+
#include "zookeeper/detector.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
@@ -46,7 +49,6 @@ namespace internal {
const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
-
class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
@@ -81,7 +83,7 @@ private:
void detected(const Future<Option<Group::Membership> >& leader);
// Invoked when we have fetched the data associated with the leader.
- void fetched(const Future<string>& data);
+ void fetched(const Group::Membership& membership, const Future<string>& data);
Owned<Group> group;
LeaderDetector detector;
@@ -294,7 +296,7 @@ void ZooKeeperMasterDetectorProcess::detected(
} else {
// Fetch the data associated with the leader.
group->data(_leader.get().get())
- .onAny(defer(self(), &Self::fetched, lambda::_1));
+ .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
}
// Keep trying to detect leadership changes.
@@ -303,7 +305,9 @@ void ZooKeeperMasterDetectorProcess::detected(
}
-void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+void ZooKeeperMasterDetectorProcess::fetched(
+ const Group::Membership& membership,
+ const Future<string>& data)
{
CHECK(!data.isDiscarded());
@@ -317,8 +321,33 @@ void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
return;
}
- // Cache the master for subsequent requests.
- leader = UPID(data.get());
+ // Parse the data based on the membership label and cache the
+ // leader for subsequent requests.
+ Option<string> label = membership.label();
+ if (label.isNone()) {
+ leader = UPID(data.get());
+ } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
+ MasterInfo info;
+ if (!info.ParseFromString(data.get())) {
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail("Failed to parse data into MasterInfo");
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+ leader = UPID(info.pid());
+ } else {
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail("Failed to parse data of unknown label " + label.get());
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+
LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
foreach (Promise<Option<UPID> >* promise, promises) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
index 2759a2f..e186e51 100644
--- a/src/zookeeper/detector.cpp
+++ b/src/zookeeper/detector.cpp
@@ -133,7 +133,7 @@ void LeaderDetectorProcess::watched(const Future<set<Group::Membership> >& membe
if (current != leader) {
LOG(INFO) << "Detected a new leader: "
<< (current.isSome()
- ? "'(id='" + stringify(current.get().id()) + "')"
+ ? "(id='" + stringify(current.get().id()) + "')"
: "None");
foreach (Promise<Option<Group::Membership> >* promise, promises) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index a50da22..ecb6c00 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -898,8 +898,8 @@ string GroupProcess::zkBasename(const Group::Membership& membership)
Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
CHECK_SOME(sequence);
- return membership.label.isSome()
- ? (membership.label.get() + "_" + sequence.get())
+ return membership.label_.isSome()
+ ? (membership.label_.get() + "_" + sequence.get())
: sequence.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 1ce1519..e51ebb2 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -74,6 +74,11 @@ public:
return sequence;
}
+ Option<std::string> label() const
+ {
+ return label_;
+ }
+
// Returns a future that is only satisfied once this membership
// has been cancelled. In which case, the value of the future is
// true if you own this membership and cancelled it by invoking
@@ -91,10 +96,10 @@ public:
Membership(int32_t _sequence,
const Option<std::string>& _label,
const process::Future<bool>& cancelled)
- : sequence(_sequence), label(_label), cancelled_(cancelled) {}
+ : sequence(_sequence), label_(_label), cancelled_(cancelled) {}
const int32_t sequence;
- const Option<std::string> label;
+ const Option<std::string> label_;
process::Future<bool> cancelled_;
};