You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/07/25 02:11:31 UTC
mesos git commit: Added helpers for converting scheduler messages to
Events.
Repository: mesos
Updated Branches:
refs/heads/master a6a3dcaa5 -> cb941fefc
Added helpers for converting scheduler messages to Events.
Review: https://reviews.apache.org/r/36803
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cb941fef
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cb941fef
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cb941fef
Branch: refs/heads/master
Commit: cb941fefcf73a557b8c0d25cc616ba75e88a6062
Parents: a6a3dca
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 24 16:24:54 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 24 17:10:57 2015 -0700
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 155 ++++++++++++++++++++++++++++++++++++-
src/common/protobuf_utils.hpp | 19 +++++
src/scheduler/scheduler.cpp | 121 ++++-------------------------
3 files changed, 186 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index d900707..90a2461 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <mesos/scheduler/scheduler.hpp>
+
#include <mesos/slave/isolator.hpp>
#include <mesos/type_utils.hpp>
@@ -33,9 +35,13 @@
using std::string;
+using mesos::scheduler::Event;
+
using mesos::slave::ExecutorLimitation;
using mesos::slave::ExecutorRunState;
+using process::UPID;
+
namespace mesos {
namespace internal {
namespace protobuf {
@@ -177,7 +183,7 @@ Option<bool> getTaskHealth(const Task& task)
* @return A fully formed `MasterInfo` with the IP/hostname information
* as derived from the `UPID`.
*/
-MasterInfo createMasterInfo(const process::UPID& pid)
+MasterInfo createMasterInfo(const UPID& pid)
{
MasterInfo info;
info.set_id(stringify(pid) + "-" + UUID::random().toString());
@@ -240,6 +246,153 @@ ExecutorRunState createExecutorRunState(
}
} // namespace slave {
+
+namespace scheduler {
+
+Event event(const FrameworkRegisteredMessage& message)
+{
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+
+ Event::Subscribed* subscribed = event.mutable_subscribed();
+ subscribed->mutable_framework_id()->CopyFrom(message.framework_id());
+
+ return event;
+}
+
+
+Event event(const FrameworkReregisteredMessage& message)
+{
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+
+ Event::Subscribed* subscribed = event.mutable_subscribed();
+ subscribed->mutable_framework_id()->CopyFrom(message.framework_id());
+
+ return event;
+}
+
+
+Event event(const ResourceOffersMessage& message)
+{
+ Event event;
+ event.set_type(Event::OFFERS);
+
+ Event::Offers* offers = event.mutable_offers();
+ offers->mutable_offers()->CopyFrom(message.offers());
+
+ return event;
+}
+
+
+Event event(const RescindResourceOfferMessage& message)
+{
+ Event event;
+ event.set_type(Event::RESCIND);
+
+ Event::Rescind* rescind = event.mutable_rescind();
+ rescind->mutable_offer_id()->CopyFrom(message.offer_id());
+
+ return event;
+}
+
+
+Event event(const StatusUpdateMessage& message)
+{
+ Event event;
+ event.set_type(Event::UPDATE);
+
+ Event::Update* update = event.mutable_update();
+
+ update->mutable_status()->CopyFrom(message.update().status());
+
+ if (message.update().has_slave_id()) {
+ update->mutable_status()->mutable_slave_id()->CopyFrom(
+ message.update().slave_id());
+ }
+
+ if (message.update().has_executor_id()) {
+ update->mutable_status()->mutable_executor_id()->CopyFrom(
+ message.update().executor_id());
+ }
+
+ update->mutable_status()->set_timestamp(message.update().timestamp());
+
+ // If the update does not have a 'uuid', it does not need
+ // acknowledging. However, prior to 0.23.0, the update uuid
+ // was required and always set. In 0.24.0, we can rely on the
+ // update uuid check here, until then we must still check for
+ // this being sent from the driver (from == UPID()) or from
+ // the master (pid == UPID()).
+ //
+ // TODO(bmahler): For the HTTP API, we will have to update the
+ // master and slave to ensure the 'uuid' in TaskStatus is set
+ // correctly.
+ if (!message.update().has_uuid() || message.update().uuid() == "") {
+ update->mutable_status()->clear_uuid();
+ } else if (UPID(message.pid()) == UPID()) {
+ update->mutable_status()->clear_uuid();
+ } else {
+ update->mutable_status()->set_uuid(message.update().uuid());
+ }
+
+ return event;
+}
+
+
+Event event(const LostSlaveMessage& message)
+{
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ Event::Failure* failure = event.mutable_failure();
+ failure->mutable_slave_id()->CopyFrom(message.slave_id());
+
+ return event;
+}
+
+
+Event event(const ExitedExecutorMessage& message)
+{
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ Event::Failure* failure = event.mutable_failure();
+ failure->mutable_slave_id()->CopyFrom(message.slave_id());
+ failure->mutable_executor_id()->CopyFrom(message.executor_id());
+ failure->set_status(message.status());
+
+ return event;
+}
+
+
+Event event(const ExecutorToFrameworkMessage& message)
+{
+ Event event;
+ event.set_type(Event::MESSAGE);
+
+ Event::Message* message_ = event.mutable_message();
+ message_->mutable_slave_id()->CopyFrom(message.slave_id());
+ message_->mutable_executor_id()->CopyFrom(message.executor_id());
+ message_->set_data(message.data());
+
+ return event;
+}
+
+
+Event event(const FrameworkErrorMessage& message)
+{
+ Event event;
+ event.set_type(Event::ERROR);
+
+ Event::Error* error = event.mutable_error();
+ error->set_message(message.message());
+
+ return event;
+}
+
+} // namespace scheduler {
+
} // namespace protobuf {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 22046ba..5c99254 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -21,6 +21,8 @@
#include <string>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <mesos/slave/isolator.hpp>
#include <stout/ip.hpp>
@@ -90,6 +92,23 @@ mesos::slave::ExecutorRunState createExecutorRunState(
const Option<std::string>& rootfs);
} // namespace slave {
+
+namespace scheduler {
+
+// Helper functions that create scheduler::Event from a message that
+// is sent to the scheduler.
+mesos::scheduler::Event event(const FrameworkRegisteredMessage& message);
+mesos::scheduler::Event event(const FrameworkReregisteredMessage& message);
+mesos::scheduler::Event event(const ResourceOffersMessage& message);
+mesos::scheduler::Event event(const RescindResourceOfferMessage& message);
+mesos::scheduler::Event event(const StatusUpdateMessage& message);
+mesos::scheduler::Event event(const LostSlaveMessage& message);
+mesos::scheduler::Event event(const ExitedExecutorMessage& message);
+mesos::scheduler::Event event(const ExecutorToFrameworkMessage& message);
+mesos::scheduler::Event event(const FrameworkErrorMessage& message);
+
+} // namespace scheduler {
+
} // namespace protobuf {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index badc107..6887ed1 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -61,6 +61,7 @@
#include "authentication/cram_md5/authenticatee.hpp"
+#include "common/protobuf_utils.hpp"
#include "master/detector.hpp"
@@ -549,147 +550,51 @@ protected:
void receive(const UPID& from, const FrameworkRegisteredMessage& message)
{
- subscribed(from, message.framework_id());
- }
+ failover = false;
- void receive(const UPID& from, const FrameworkReregisteredMessage& message)
- {
- subscribed(from, message.framework_id());
+ receive(from, protobuf::scheduler::event(message));
}
- void subscribed(const UPID& from, const FrameworkID& frameworkId)
+ void receive(const UPID& from, const FrameworkReregisteredMessage& message)
{
- // We've now registered at least once with the master so we're no
- // longer failing over. See the comment where 'failover' is
- // declared for further details.
failover = false;
- Event event;
- event.set_type(Event::SUBSCRIBED);
-
- Event::Subscribed* subscribed = event.mutable_subscribed();
-
- subscribed->mutable_framework_id()->CopyFrom(frameworkId);
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const ResourceOffersMessage& message)
{
- Event event;
- event.set_type(Event::OFFERS);
-
- Event::Offers* offers = event.mutable_offers();
-
- offers->mutable_offers()->CopyFrom(message.offers());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const RescindResourceOfferMessage& message)
{
- Event event;
- event.set_type(Event::RESCIND);
-
- Event::Rescind* rescind = event.mutable_rescind();
-
- rescind->mutable_offer_id()->CopyFrom(message.offer_id());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const StatusUpdateMessage& message)
{
- Event event;
- event.set_type(Event::UPDATE);
-
- Event::Update* update = event.mutable_update();
-
- update->mutable_status()->CopyFrom(message.update().status());
-
- if (message.update().has_slave_id()) {
- update->mutable_status()->mutable_slave_id()->CopyFrom(
- message.update().slave_id());
- }
-
- if (message.update().has_executor_id()) {
- update->mutable_status()->mutable_executor_id()->CopyFrom(
- message.update().executor_id());
- }
-
- update->mutable_status()->set_timestamp(message.update().timestamp());
-
- // If the update does not have a 'uuid', it does not need
- // acknowledging. However, prior to 0.23.0, the update uuid
- // was required and always set. In 0.24.0, we can rely on the
- // update uuid check here, until then we must still check for
- // this being sent from the driver (from == UPID()) or from
- // the master (pid == UPID()).
- //
- // TODO(bmahler): For the HTTP API, we will have to update the
- // master and slave to ensure the 'uuid' in TaskStatus is set
- // correctly.
- if (!message.update().has_uuid() || message.update().uuid() == "") {
- update->mutable_status()->clear_uuid();
- } else if (UPID(message.pid()) == UPID()) {
- update->mutable_status()->clear_uuid();
- } else {
- update->mutable_status()->set_uuid(message.update().uuid());
- }
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const LostSlaveMessage& message)
{
- Event event;
- event.set_type(Event::FAILURE);
-
- Event::Failure* failure = event.mutable_failure();
-
- failure->mutable_slave_id()->CopyFrom(message.slave_id());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const ExitedExecutorMessage& message)
{
- Event event;
- event.set_type(Event::FAILURE);
-
- Event::Failure* failure = event.mutable_failure();
-
- failure->mutable_slave_id()->CopyFrom(message.slave_id());
- failure->mutable_executor_id()->CopyFrom(message.executor_id());
- failure->set_status(message.status());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
- void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
+ void receive(const UPID& from, const ExecutorToFrameworkMessage& message)
{
- Event event;
- event.set_type(Event::MESSAGE);
-
- Event::Message* message = event.mutable_message();
-
- message->mutable_slave_id()->CopyFrom(_message.slave_id());
- message->mutable_executor_id()->CopyFrom(_message.executor_id());
- message->set_data(_message.data());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
void receive(const UPID& from, const FrameworkErrorMessage& message)
{
- Event event;
- event.set_type(Event::ERROR);
-
- Event::Error* error = event.mutable_error();
-
- error->set_message(message.message());
-
- receive(from, event);
+ receive(from, protobuf::scheduler::event(message));
}
// Helper for injecting an ERROR event.