You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:37 UTC
[10/11] mesos git commit: Added SUBSCRIBE call and SUBSCRIBED event.
Added SUBSCRIBE call and SUBSCRIBED event.
Review: https://reviews.apache.org/r/32844
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb3c958e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb3c958e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb3c958e
Branch: refs/heads/master
Commit: eb3c958e3932a823115a502c6b0e57a29b54bf94
Parents: 94cb038
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 14:35:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 22 ++++----
src/examples/low_level_scheduler_libprocess.cpp | 28 +++-------
src/examples/low_level_scheduler_pthread.cpp | 28 +++-------
src/master/master.cpp | 13 +++--
src/scheduler/scheduler.cpp | 57 +++++++++-----------
src/tests/scheduler_tests.cpp | 24 ++++-----
6 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 7a77fe1..aea5607 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -34,8 +34,7 @@ message Event {
// Possible event types, followed by message definitions if
// applicable.
enum Type {
- REGISTERED = 1;
- REREGISTERED = 2;
+ SUBSCRIBED = 1;
OFFERS = 3;
RESCIND = 4;
UPDATE = 5;
@@ -44,11 +43,8 @@ message Event {
ERROR = 8;
}
- message Registered {
- required FrameworkID framework_id = 1;
- }
-
- message Reregistered {
+ // First event received when the scheduler subscribes.
+ message Subscribed {
required FrameworkID framework_id = 1;
}
@@ -93,8 +89,7 @@ message Event {
// present if that type has a nested message definition.
required Type type = 1;
- optional Registered registered = 2;
- optional Reregistered reregistered = 3;
+ optional Subscribed subscribed = 2;
optional Offers offers = 4;
optional Rescind rescind = 5;
optional Update update = 6;
@@ -114,8 +109,7 @@ message Call {
// Possible call types, followed by message definitions if
// applicable.
enum Type {
- REGISTER = 1;
- REREGISTER = 2;
+ SUBSCRIBE = 1; // See 'framework_info' below.
UNREGISTER = 3;
REVIVE = 6;
DECLINE = 5;
@@ -127,7 +121,7 @@ message Call {
SHUTDOWN = 13;
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
- // already registered frameworks as a way of stopping offers from
+ // already subscribed frameworks as a way of stopping offers from
// being generated and other events from being sent by the master.
// Note that this functionality existed originally to support
// SchedulerDriver::abort which was only necessary to handle
@@ -214,7 +208,9 @@ message Call {
// Identifies who generated this call. Always necessary, but the
// only thing that needs to be set for certain calls, e.g.,
- // REGISTER, REREGISTER, and UNREGISTER.
+ // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+ // set except when a brand new scheduler SUBSCRIBEs for the very
+ // first time.
required FrameworkInfo framework_info = 1;
// Type of the call, indicates which optional field below should be
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index fbfb0f7..cf85c76 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -117,24 +117,13 @@ public:
events.pop();
switch (event.type()) {
- case Event::REGISTERED: {
- cout << endl << "Received a REGISTERED event" << endl;
+ case Event::SUBSCRIBED: {
+ cout << endl << "Received a SUBSCRIBED event" << endl;
- framework.mutable_id()->CopyFrom(event.registered().framework_id());
- state = REGISTERED;
+ framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+ state = SUBSCRIBED;
- cout << "Framework '" << event.registered().framework_id().value()
- << "' registered" << endl;
- break;
- }
-
- case Event::REREGISTERED: {
- cout << endl << "Received a REREGISTERED event" << endl;
-
- state = REGISTERED;
-
- cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered" << endl;
+ cout << "Subscribed with ID '" << framework.id() << endl;
break;
}
@@ -303,14 +292,13 @@ private:
void doReliableRegistration()
{
- if (state == REGISTERED) {
+ if (state == SUBSCRIBED) {
return;
}
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(
- state == INITIALIZING ? Call::REGISTER : Call::REREGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
@@ -334,7 +322,7 @@ private:
enum State {
INITIALIZING = 0,
- REGISTERED = 1,
+ SUBSCRIBED = 1,
DISCONNECTED = 2
} state;
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 944573d..4af576d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -135,29 +135,16 @@ public:
events.pop();
switch (event.type()) {
- case Event::REGISTERED: {
- cout << endl << "Received a REGISTERED event" << endl;
+ case Event::SUBSCRIBED: {
+ cout << endl << "Received a SUBSCRIBED event" << endl;
pthread_mutex_lock(&mutex);
- state = REGISTERED;
+ state = SUBSCRIBED;
pthread_mutex_unlock(&mutex);
- framework.mutable_id()->CopyFrom(event.registered().framework_id());
+ framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
- cout << "Framework '" << event.registered().framework_id().value()
- << "' registered" << endl;
- break;
- }
-
- case Event::REREGISTERED: {
- cout << endl << "Received a REREGISTERED event" << endl;
-
- pthread_mutex_lock(&mutex);
- state = REGISTERED;
- pthread_mutex_unlock(&mutex);
-
- cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered" << endl;
+ cout << "Subscribed with ID '" << framework.id() << endl;
break;
}
@@ -362,8 +349,7 @@ private:
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(
- state == CONNECTED ? Call::REGISTER : Call::REREGISTER);
+ call.set_type(Call::SUBSCRIBE);
pthread_mutex_unlock(&mutex);
mesos.send(call);
@@ -390,7 +376,7 @@ private:
enum State {
INITIALIZING = 0,
CONNECTED = 1,
- REGISTERED = 2,
+ SUBSCRIBED = 2,
DISCONNECTED = 3,
DONE = 4
} state;
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2c161a9..502d3ba 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1555,15 +1555,13 @@ void Master::receive(
const scheduler::Call& call)
{
// TODO(vinod): Add metrics for calls.
-
+ // TODO(vinod): Implement the unimplemented calls.
const FrameworkInfo& frameworkInfo = call.framework_info();
- // For REGISTER and REREGISTER calls, no need to look up the
- // framework. Therefore, we handle them first and separately from
- // other types of calls.
+ // For SUBSCRIBE call, no need to look up the framework. Therefore,
+ // we handle them first and separately from other types of calls.
switch (call.type()) {
- case scheduler::Call::REGISTER:
- case scheduler::Call::REREGISTER:
+ case scheduler::Call::SUBSCRIBE:
drop(from, call, "Unimplemented");
return;
@@ -1586,7 +1584,8 @@ void Master::receive(
}
// TODO(jieyu): Validate frameworkInfo to make sure it's the same as
- // the one that the framework used during registration.
+ // the one that the framework used during registration and that the
+ // framework id is set and non-empty except for SUBSCRIBE call.
switch (call.type()) {
case scheduler::Call::UNREGISTER:
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 7d53d51..82cbfcb 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -200,9 +200,10 @@ public:
call.mutable_framework_info()->set_user(user.get());
}
- // Only a REGISTER should not have set the framework ID.
- if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
- drop(call, "Call is mising FrameworkInfo.id");
+ // Only a SUBSCRIBE call may not have set the framework ID.
+ if (call.type() != Call::SUBSCRIBE &&
+ (!call.framework_info().has_id() || call.framework_info().id() == "")) {
+ drop(call, "Call is missing FrameworkInfo.id");
return;
}
@@ -213,18 +214,18 @@ public:
}
switch (call.type()) {
- case Call::REGISTER: {
- RegisterFrameworkMessage message;
- message.mutable_framework()->CopyFrom(call.framework_info());
- send(master.get(), message);
- break;
- }
-
- case Call::REREGISTER: {
- ReregisterFrameworkMessage message;
- message.mutable_framework()->CopyFrom(call.framework_info());
- message.set_failover(failover);
- send(master.get(), message);
+ case Call::SUBSCRIBE: {
+ if (!call.framework_info().has_id() ||
+ call.framework_info().id() == "") {
+ RegisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ send(master.get(), message);
+ } else {
+ ReregisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ message.set_failover(failover);
+ send(master.get(), message);
+ }
break;
}
@@ -386,6 +387,7 @@ protected:
VLOG(1) << "New master detected at " << master.get();
if (credential.isSome()) {
+ // TODO(vinod): Do pure HTTP Authentication instead of SASL.
// Authenticate with the master.
authenticate();
} else {
@@ -572,34 +574,27 @@ protected:
void receive(const UPID& from, const FrameworkRegisteredMessage& message)
{
- // We've now registered at least once with the master so we're no
- // longer failing over. See the comment where 'failover' is
- // declared for further details.
- failover = false;
-
- Event event;
- event.set_type(Event::REGISTERED);
-
- Event::Registered* registered = event.mutable_registered();
-
- registered->mutable_framework_id()->CopyFrom(message.framework_id());
-
- receive(from, event);
+ subscribed(from, message.framework_id());
}
void receive(const UPID& from, const FrameworkReregisteredMessage& message)
{
+ subscribed(from, message.framework_id());
+ }
+
+ void subscribed(const UPID& from, const FrameworkID& frameworkId)
+ {
// We've now registered at least once with the master so we're no
// longer failing over. See the comment where 'failover' is
// declared for further details.
failover = false;
Event event;
- event.set_type(Event::REREGISTERED);
+ event.set_type(Event::SUBSCRIBED);
- Event::Reregistered* reregistered = event.mutable_reregistered();
+ Event::Subscribed* subscribed = event.mutable_subscribed();
- reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+ subscribed->mutable_framework_id()->CopyFrom(frameworkId);
receive(from, event);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a59b146..ddbb712 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -142,16 +142,16 @@ TEST_F(SchedulerTest, TaskRunning)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -247,16 +247,16 @@ TEST_F(SchedulerTest, ReconcileTask)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -354,16 +354,16 @@ TEST_F(SchedulerTest, KillTask)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -478,16 +478,16 @@ TEST_F(SchedulerTest, ShutdownExecutor)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);