You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:37 UTC

[10/11] mesos git commit: Added SUBSCRIBE call and SUBSCRIBED event.

Added SUBSCRIBE call and SUBSCRIBED event.

Review: https://reviews.apache.org/r/32844


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb3c958e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb3c958e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb3c958e

Branch: refs/heads/master
Commit: eb3c958e3932a823115a502c6b0e57a29b54bf94
Parents: 94cb038
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 14:35:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         | 22 ++++----
 src/examples/low_level_scheduler_libprocess.cpp | 28 +++-------
 src/examples/low_level_scheduler_pthread.cpp    | 28 +++-------
 src/master/master.cpp                           | 13 +++--
 src/scheduler/scheduler.cpp                     | 57 +++++++++-----------
 src/tests/scheduler_tests.cpp                   | 24 ++++-----
 6 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 7a77fe1..aea5607 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -34,8 +34,7 @@ message Event {
   // Possible event types, followed by message definitions if
   // applicable.
   enum Type {
-    REGISTERED = 1;
-    REREGISTERED = 2;
+    SUBSCRIBED = 1;
     OFFERS = 3;
     RESCIND = 4;
     UPDATE = 5;
@@ -44,11 +43,8 @@ message Event {
     ERROR = 8;
   }
 
-  message Registered {
-    required FrameworkID framework_id = 1;
-  }
-
-  message Reregistered {
+  // First event received when the scheduler subscribes.
+  message Subscribed {
     required FrameworkID framework_id = 1;
   }
 
@@ -93,8 +89,7 @@ message Event {
   // present if that type has a nested message definition.
   required Type type = 1;
 
-  optional Registered registered = 2;
-  optional Reregistered reregistered = 3;
+  optional Subscribed subscribed = 2;
   optional Offers offers = 4;
   optional Rescind rescind = 5;
   optional Update update = 6;
@@ -114,8 +109,7 @@ message Call {
   // Possible call types, followed by message definitions if
   // applicable.
   enum Type {
-    REGISTER = 1;
-    REREGISTER = 2;
+    SUBSCRIBE = 1;    // See 'framework_info' below.
     UNREGISTER = 3;
     REVIVE = 6;
     DECLINE = 5;
@@ -127,7 +121,7 @@ message Call {
     SHUTDOWN = 13;
 
     // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
-    // already registered frameworks as a way of stopping offers from
+    // already subscribed frameworks as a way of stopping offers from
     // being generated and other events from being sent by the master.
     // Note that this functionality existed originally to support
     // SchedulerDriver::abort which was only necessary to handle
@@ -214,7 +208,9 @@ message Call {
 
   // Identifies who generated this call. Always necessary, but the
   // only thing that needs to be set for certain calls, e.g.,
-  // REGISTER, REREGISTER, and UNREGISTER.
+  // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+  // set except when a brand new scheduler SUBSCRIBEs for the very
+  // first time.
   required FrameworkInfo framework_info = 1;
 
   // Type of the call, indicates which optional field below should be

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index fbfb0f7..cf85c76 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -117,24 +117,13 @@ public:
       events.pop();
 
       switch (event.type()) {
-        case Event::REGISTERED: {
-          cout << endl << "Received a REGISTERED event" << endl;
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
 
-          framework.mutable_id()->CopyFrom(event.registered().framework_id());
-          state = REGISTERED;
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+          state = SUBSCRIBED;
 
-          cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered" << endl;
-          break;
-        }
-
-        case Event::REREGISTERED: {
-          cout << endl << "Received a REREGISTERED event" << endl;
-
-          state = REGISTERED;
-
-          cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered" << endl;
+          cout << "Subscribed with ID '" << framework.id() << endl;
           break;
         }
 
@@ -303,14 +292,13 @@ private:
 
   void doReliableRegistration()
   {
-    if (state == REGISTERED) {
+    if (state == SUBSCRIBED) {
       return;
     }
 
     Call call;
     call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(
-        state == INITIALIZING ? Call::REGISTER : Call::REREGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
 
@@ -334,7 +322,7 @@ private:
 
   enum State {
     INITIALIZING = 0,
-    REGISTERED = 1,
+    SUBSCRIBED = 1,
     DISCONNECTED = 2
   } state;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 944573d..4af576d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -135,29 +135,16 @@ public:
       events.pop();
 
       switch (event.type()) {
-        case Event::REGISTERED: {
-          cout << endl << "Received a REGISTERED event" << endl;
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
 
           pthread_mutex_lock(&mutex);
-          state = REGISTERED;
+          state = SUBSCRIBED;
           pthread_mutex_unlock(&mutex);
 
-          framework.mutable_id()->CopyFrom(event.registered().framework_id());
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
 
-          cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered" << endl;
-          break;
-        }
-
-        case Event::REREGISTERED: {
-          cout << endl << "Received a REREGISTERED event" << endl;
-
-          pthread_mutex_lock(&mutex);
-          state = REGISTERED;
-          pthread_mutex_unlock(&mutex);
-
-          cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered" << endl;
+          cout << "Subscribed with ID '" << framework.id() << endl;
           break;
         }
 
@@ -362,8 +349,7 @@ private:
 
     Call call;
     call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(
-        state == CONNECTED ? Call::REGISTER : Call::REREGISTER);
+    call.set_type(Call::SUBSCRIBE);
     pthread_mutex_unlock(&mutex);
 
     mesos.send(call);
@@ -390,7 +376,7 @@ private:
   enum State {
     INITIALIZING = 0,
     CONNECTED = 1,
-    REGISTERED = 2,
+    SUBSCRIBED = 2,
     DISCONNECTED = 3,
     DONE = 4
   } state;

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2c161a9..502d3ba 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1555,15 +1555,13 @@ void Master::receive(
     const scheduler::Call& call)
 {
   // TODO(vinod): Add metrics for calls.
-
+  // TODO(vinod): Implement the unimplemented calls.
   const FrameworkInfo& frameworkInfo = call.framework_info();
 
-  // For REGISTER and REREGISTER calls, no need to look up the
-  // framework. Therefore, we handle them first and separately from
-  // other types of calls.
+  // For SUBSCRIBE call, no need to look up the framework. Therefore,
+  // we handle them first and separately from other types of calls.
   switch (call.type()) {
-    case scheduler::Call::REGISTER:
-    case scheduler::Call::REREGISTER:
+    case scheduler::Call::SUBSCRIBE:
       drop(from, call, "Unimplemented");
       return;
 
@@ -1586,7 +1584,8 @@ void Master::receive(
   }
 
   // TODO(jieyu): Validate frameworkInfo to make sure it's the same as
-  // the one that the framework used during registration.
+  // the one that the framework used during registration and that the
+  // framework id is set and non-empty except for SUBSCRIBE call.
 
   switch (call.type()) {
     case scheduler::Call::UNREGISTER:

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 7d53d51..82cbfcb 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -200,9 +200,10 @@ public:
       call.mutable_framework_info()->set_user(user.get());
     }
 
-    // Only a REGISTER should not have set the framework ID.
-    if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
-      drop(call, "Call is mising FrameworkInfo.id");
+    // Only a SUBSCRIBE call may not have set the framework ID.
+    if (call.type() != Call::SUBSCRIBE &&
+        (!call.framework_info().has_id() || call.framework_info().id() == "")) {
+      drop(call, "Call is missing FrameworkInfo.id");
       return;
     }
 
@@ -213,18 +214,18 @@ public:
     }
 
     switch (call.type()) {
-      case Call::REGISTER: {
-        RegisterFrameworkMessage message;
-        message.mutable_framework()->CopyFrom(call.framework_info());
-        send(master.get(), message);
-        break;
-      }
-
-      case Call::REREGISTER: {
-        ReregisterFrameworkMessage message;
-        message.mutable_framework()->CopyFrom(call.framework_info());
-        message.set_failover(failover);
-        send(master.get(), message);
+      case Call::SUBSCRIBE: {
+        if (!call.framework_info().has_id() ||
+            call.framework_info().id() == "") {
+          RegisterFrameworkMessage message;
+          message.mutable_framework()->CopyFrom(call.framework_info());
+          send(master.get(), message);
+        } else {
+          ReregisterFrameworkMessage message;
+          message.mutable_framework()->CopyFrom(call.framework_info());
+          message.set_failover(failover);
+          send(master.get(), message);
+        }
         break;
       }
 
@@ -386,6 +387,7 @@ protected:
       VLOG(1) << "New master detected at " << master.get();
 
       if (credential.isSome()) {
+        // TODO(vinod): Do pure HTTP Authentication instead of SASL.
         // Authenticate with the master.
         authenticate();
       } else {
@@ -572,34 +574,27 @@ protected:
 
   void receive(const UPID& from, const FrameworkRegisteredMessage& message)
   {
-    // We've now registered at least once with the master so we're no
-    // longer failing over. See the comment where 'failover' is
-    // declared for further details.
-    failover = false;
-
-    Event event;
-    event.set_type(Event::REGISTERED);
-
-    Event::Registered* registered = event.mutable_registered();
-
-    registered->mutable_framework_id()->CopyFrom(message.framework_id());
-
-    receive(from, event);
+    subscribed(from, message.framework_id());
   }
 
   void receive(const UPID& from, const FrameworkReregisteredMessage& message)
   {
+    subscribed(from, message.framework_id());
+  }
+
+  void subscribed(const UPID& from, const FrameworkID& frameworkId)
+  {
     // We've now registered at least once with the master so we're no
     // longer failing over. See the comment where 'failover' is
     // declared for further details.
     failover = false;
 
     Event event;
-    event.set_type(Event::REREGISTERED);
+    event.set_type(Event::SUBSCRIBED);
 
-    Event::Reregistered* reregistered = event.mutable_reregistered();
+    Event::Subscribed* subscribed = event.mutable_subscribed();
 
-    reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+    subscribed->mutable_framework_id()->CopyFrom(frameworkId);
 
     receive(from, event);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a59b146..ddbb712 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -142,16 +142,16 @@ TEST_F(SchedulerTest, TaskRunning)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -247,16 +247,16 @@ TEST_F(SchedulerTest, ReconcileTask)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -354,16 +354,16 @@ TEST_F(SchedulerTest, KillTask)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -478,16 +478,16 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);