You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/07/25 02:11:31 UTC

mesos git commit: Added helpers for converting scheduler messages to Events.

Repository: mesos
Updated Branches:
  refs/heads/master a6a3dcaa5 -> cb941fefc


Added helpers for converting scheduler messages to Events.

Review: https://reviews.apache.org/r/36803


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cb941fef
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cb941fef
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cb941fef

Branch: refs/heads/master
Commit: cb941fefcf73a557b8c0d25cc616ba75e88a6062
Parents: a6a3dca
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 24 16:24:54 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 24 17:10:57 2015 -0700

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp | 155 ++++++++++++++++++++++++++++++++++++-
 src/common/protobuf_utils.hpp |  19 +++++
 src/scheduler/scheduler.cpp   | 121 ++++-------------------------
 3 files changed, 186 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index d900707..90a2461 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <mesos/slave/isolator.hpp>
 
 #include <mesos/type_utils.hpp>
@@ -33,9 +35,13 @@
 
 using std::string;
 
+using mesos::scheduler::Event;
+
 using mesos::slave::ExecutorLimitation;
 using mesos::slave::ExecutorRunState;
 
+using process::UPID;
+
 namespace mesos {
 namespace internal {
 namespace protobuf {
@@ -177,7 +183,7 @@ Option<bool> getTaskHealth(const Task& task)
  * @return A fully formed `MasterInfo` with the IP/hostname information
  *    as derived from the `UPID`.
  */
-MasterInfo createMasterInfo(const process::UPID& pid)
+MasterInfo createMasterInfo(const UPID& pid)
 {
   MasterInfo info;
   info.set_id(stringify(pid) + "-" + UUID::random().toString());
@@ -240,6 +246,153 @@ ExecutorRunState createExecutorRunState(
 }
 
 } // namespace slave {
+
+namespace scheduler {
+
+Event event(const FrameworkRegisteredMessage& message)
+{
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+
+  Event::Subscribed* subscribed = event.mutable_subscribed();
+  subscribed->mutable_framework_id()->CopyFrom(message.framework_id());
+
+  return event;
+}
+
+
+Event event(const FrameworkReregisteredMessage& message)
+{
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+
+  Event::Subscribed* subscribed = event.mutable_subscribed();
+  subscribed->mutable_framework_id()->CopyFrom(message.framework_id());
+
+  return event;
+}
+
+
+Event event(const ResourceOffersMessage& message)
+{
+  Event event;
+  event.set_type(Event::OFFERS);
+
+  Event::Offers* offers = event.mutable_offers();
+  offers->mutable_offers()->CopyFrom(message.offers());
+
+  return event;
+}
+
+
+Event event(const RescindResourceOfferMessage& message)
+{
+  Event event;
+  event.set_type(Event::RESCIND);
+
+  Event::Rescind* rescind = event.mutable_rescind();
+  rescind->mutable_offer_id()->CopyFrom(message.offer_id());
+
+  return event;
+}
+
+
+Event event(const StatusUpdateMessage& message)
+{
+  Event event;
+  event.set_type(Event::UPDATE);
+
+  Event::Update* update = event.mutable_update();
+
+  update->mutable_status()->CopyFrom(message.update().status());
+
+  if (message.update().has_slave_id()) {
+    update->mutable_status()->mutable_slave_id()->CopyFrom(
+        message.update().slave_id());
+  }
+
+  if (message.update().has_executor_id()) {
+    update->mutable_status()->mutable_executor_id()->CopyFrom(
+        message.update().executor_id());
+  }
+
+  update->mutable_status()->set_timestamp(message.update().timestamp());
+
+  // If the update does not have a 'uuid', it does not need
+  // acknowledging. However, prior to 0.23.0, the update uuid
+  // was required and always set. In 0.24.0, we can rely on the
+  // update uuid check here, until then we must still check for
+  // this being sent from the driver (from == UPID()) or from
+  // the master (pid == UPID()).
+  //
+  // TODO(bmahler): For the HTTP API, we will have to update the
+  // master and slave to ensure the 'uuid' in TaskStatus is set
+  // correctly.
+  if (!message.update().has_uuid() || message.update().uuid() == "") {
+    update->mutable_status()->clear_uuid();
+  } else if (UPID(message.pid()) == UPID()) {
+    update->mutable_status()->clear_uuid();
+  } else {
+    update->mutable_status()->set_uuid(message.update().uuid());
+  }
+
+  return event;
+}
+
+
+Event event(const LostSlaveMessage& message)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  Event::Failure* failure = event.mutable_failure();
+  failure->mutable_slave_id()->CopyFrom(message.slave_id());
+
+  return event;
+}
+
+
+Event event(const ExitedExecutorMessage& message)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  Event::Failure* failure = event.mutable_failure();
+  failure->mutable_slave_id()->CopyFrom(message.slave_id());
+  failure->mutable_executor_id()->CopyFrom(message.executor_id());
+  failure->set_status(message.status());
+
+  return event;
+}
+
+
+Event event(const ExecutorToFrameworkMessage& message)
+{
+  Event event;
+  event.set_type(Event::MESSAGE);
+
+  Event::Message* message_ = event.mutable_message();
+  message_->mutable_slave_id()->CopyFrom(message.slave_id());
+  message_->mutable_executor_id()->CopyFrom(message.executor_id());
+  message_->set_data(message.data());
+
+  return event;
+}
+
+
+Event event(const FrameworkErrorMessage& message)
+{
+  Event event;
+  event.set_type(Event::ERROR);
+
+  Event::Error* error = event.mutable_error();
+  error->set_message(message.message());
+
+  return event;
+}
+
+} // namespace scheduler {
+
 } // namespace protobuf {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 22046ba..5c99254 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -21,6 +21,8 @@
 
 #include <string>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <mesos/slave/isolator.hpp>
 
 #include <stout/ip.hpp>
@@ -90,6 +92,23 @@ mesos::slave::ExecutorRunState createExecutorRunState(
     const Option<std::string>& rootfs);
 
 } // namespace slave {
+
+namespace scheduler {
+
+// Helper functions that create scheduler::Event from a message that
+// is sent to the scheduler.
+mesos::scheduler::Event event(const FrameworkRegisteredMessage& message);
+mesos::scheduler::Event event(const FrameworkReregisteredMessage& message);
+mesos::scheduler::Event event(const ResourceOffersMessage& message);
+mesos::scheduler::Event event(const RescindResourceOfferMessage& message);
+mesos::scheduler::Event event(const StatusUpdateMessage& message);
+mesos::scheduler::Event event(const LostSlaveMessage& message);
+mesos::scheduler::Event event(const ExitedExecutorMessage& message);
+mesos::scheduler::Event event(const ExecutorToFrameworkMessage& message);
+mesos::scheduler::Event event(const FrameworkErrorMessage& message);
+
+} // namespace scheduler {
+
 } // namespace protobuf {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index badc107..6887ed1 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -61,6 +61,7 @@
 
 #include "authentication/cram_md5/authenticatee.hpp"
 
+#include "common/protobuf_utils.hpp"
 
 #include "master/detector.hpp"
 
@@ -549,147 +550,51 @@ protected:
 
   void receive(const UPID& from, const FrameworkRegisteredMessage& message)
   {
-    subscribed(from, message.framework_id());
-  }
+    failover = false;
 
-  void receive(const UPID& from, const FrameworkReregisteredMessage& message)
-  {
-    subscribed(from, message.framework_id());
+    receive(from, protobuf::scheduler::event(message));
   }
 
-  void subscribed(const UPID& from, const FrameworkID& frameworkId)
+  void receive(const UPID& from, const FrameworkReregisteredMessage& message)
   {
-    // We've now registered at least once with the master so we're no
-    // longer failing over. See the comment where 'failover' is
-    // declared for further details.
     failover = false;
 
-    Event event;
-    event.set_type(Event::SUBSCRIBED);
-
-    Event::Subscribed* subscribed = event.mutable_subscribed();
-
-    subscribed->mutable_framework_id()->CopyFrom(frameworkId);
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const ResourceOffersMessage& message)
   {
-    Event event;
-    event.set_type(Event::OFFERS);
-
-    Event::Offers* offers = event.mutable_offers();
-
-    offers->mutable_offers()->CopyFrom(message.offers());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const RescindResourceOfferMessage& message)
   {
-    Event event;
-    event.set_type(Event::RESCIND);
-
-    Event::Rescind* rescind = event.mutable_rescind();
-
-    rescind->mutable_offer_id()->CopyFrom(message.offer_id());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const StatusUpdateMessage& message)
   {
-    Event event;
-    event.set_type(Event::UPDATE);
-
-    Event::Update* update = event.mutable_update();
-
-    update->mutable_status()->CopyFrom(message.update().status());
-
-    if (message.update().has_slave_id()) {
-      update->mutable_status()->mutable_slave_id()->CopyFrom(
-          message.update().slave_id());
-    }
-
-    if (message.update().has_executor_id()) {
-      update->mutable_status()->mutable_executor_id()->CopyFrom(
-          message.update().executor_id());
-    }
-
-    update->mutable_status()->set_timestamp(message.update().timestamp());
-
-    // If the update does not have a 'uuid', it does not need
-    // acknowledging. However, prior to 0.23.0, the update uuid
-    // was required and always set. In 0.24.0, we can rely on the
-    // update uuid check here, until then we must still check for
-    // this being sent from the driver (from == UPID()) or from
-    // the master (pid == UPID()).
-    //
-    // TODO(bmahler): For the HTTP API, we will have to update the
-    // master and slave to ensure the 'uuid' in TaskStatus is set
-    // correctly.
-    if (!message.update().has_uuid() || message.update().uuid() == "") {
-      update->mutable_status()->clear_uuid();
-    } else if (UPID(message.pid()) == UPID()) {
-      update->mutable_status()->clear_uuid();
-    } else {
-      update->mutable_status()->set_uuid(message.update().uuid());
-    }
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const LostSlaveMessage& message)
   {
-    Event event;
-    event.set_type(Event::FAILURE);
-
-    Event::Failure* failure = event.mutable_failure();
-
-    failure->mutable_slave_id()->CopyFrom(message.slave_id());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const ExitedExecutorMessage& message)
   {
-    Event event;
-    event.set_type(Event::FAILURE);
-
-    Event::Failure* failure = event.mutable_failure();
-
-    failure->mutable_slave_id()->CopyFrom(message.slave_id());
-    failure->mutable_executor_id()->CopyFrom(message.executor_id());
-    failure->set_status(message.status());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
-  void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
+  void receive(const UPID& from, const ExecutorToFrameworkMessage& message)
   {
-    Event event;
-    event.set_type(Event::MESSAGE);
-
-    Event::Message* message = event.mutable_message();
-
-    message->mutable_slave_id()->CopyFrom(_message.slave_id());
-    message->mutable_executor_id()->CopyFrom(_message.executor_id());
-    message->set_data(_message.data());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   void receive(const UPID& from, const FrameworkErrorMessage& message)
   {
-    Event event;
-    event.set_type(Event::ERROR);
-
-    Event::Error* error = event.mutable_error();
-
-    error->set_message(message.message());
-
-    receive(from, event);
+    receive(from, protobuf::scheduler::event(message));
   }
 
   // Helper for injecting an ERROR event.