You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/28 21:58:38 UTC
mesos git commit: Updated Framework struct in master for the http api.
Repository: mesos
Updated Branches:
refs/heads/master 4b4cba24d -> 90b107a24
Updated Framework struct in master for the http api.
This change refactors the Framework struct in master to introduce
support for http frameworks:
* 'pid' becomes a optional field.
* Added optional 'http' field.
Review: https://reviews.apache.org/r/36318
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90b107a2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90b107a2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90b107a2
Branch: refs/heads/master
Commit: 90b107a249169c6fc8b8d398b675ab9bd2df633b
Parents: 4b4cba2
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jul 28 11:53:45 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Jul 28 12:19:31 2015 -0700
----------------------------------------------------------------------
src/common/http.hpp | 8 ++
src/master/http.cpp | 6 +-
src/master/master.cpp | 183 +++++++++++++++++++++++++++------------------
src/master/master.hpp | 105 ++++++++++++++++++++++++--
4 files changed, 221 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 765860f..9e4290f 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -38,6 +38,14 @@ class Task;
extern const char APPLICATION_JSON[];
extern const char APPLICATION_PROTOBUF[];
+// Possible content-types that can be used as responses for
+// the mesos Http API.
+enum class ContentType
+{
+ PROTOBUF,
+ JSON
+};
+
JSON::Object model(const Resources& resources);
JSON::Object model(const hashmap<std::string, Resources>& roleResources);
JSON::Object model(const Attributes& attributes);
http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 3a1598f..3772e39 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -113,7 +113,11 @@ JSON::Object summarize(const Framework& framework)
JSON::Object object;
object.values["id"] = framework.id().value();
object.values["name"] = framework.info.name();
- object.values["pid"] = string(framework.pid);
+
+ // Omit pid for http frameworks.
+ if (framework.pid.isSome()) {
+ object.values["pid"] = string(framework.pid.get());
+ }
// TODO(bmahler): Use these in the webui.
object.values["used_resources"] = model(framework.totalUsedResources);
http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a8a195d..3e63184 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1659,7 +1659,7 @@ void Master::receive(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
drop(from, call, "Call is not from registered framework");
return;
}
@@ -1844,41 +1844,41 @@ void Master::_registerFramework(
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_master_info()->MergeFrom(info_);
- send(from, message);
+ framework->send(message);
return;
}
}
+ // TODO(vinod): Deprecate this in favor of authorization.
+ bool rootSubmissions = flags.root_submissions;
+
+ if (frameworkInfo.user() == "root" && rootSubmissions == false) {
+ LOG(INFO) << "Framework " << frameworkInfo.name() << " at " << from
+ << " registering as root, but root submissions are disabled"
+ << " on this cluster";
+ FrameworkErrorMessage message;
+ message.set_message("User 'root' is not allowed to run frameworks");
+ send(from, message);
+ return;
+ }
+
// Assign a new FrameworkID.
FrameworkInfo frameworkInfo_ = frameworkInfo;
frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
- Framework* framework = new Framework(frameworkInfo_, from);
+ Framework* framework = new Framework(this, frameworkInfo_, from);
LOG(INFO) << "Registering framework " << *framework
<< " with checkpointing "
<< (framework->info.checkpoint() ? "enabled" : "disabled")
<< " and capabilities " << framework->info.capabilities();
- // TODO(vinod): Deprecate this in favor of authorization.
- bool rootSubmissions = flags.root_submissions;
-
- if (framework->info.user() == "root" && rootSubmissions == false) {
- LOG(INFO) << "Framework " << *framework << " registering as root, but "
- << "root submissions are disabled on this cluster";
- FrameworkErrorMessage message;
- message.set_message("User 'root' is not allowed to run frameworks");
- send(from, message);
- delete framework;
- return;
- }
-
addFramework(framework);
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_master_info()->MergeFrom(info_);
- send(framework->pid, message);
+ framework->send(message);
}
@@ -1952,6 +1952,7 @@ void Master::_reregisterFramework(
const Future<Option<Error>>& validationError)
{
CHECK_READY(validationError);
+
if (validationError.get().isSome()) {
LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id()
<< " (" << frameworkInfo.name() << ") " << " at " << from
@@ -2025,7 +2026,7 @@ void Master::_reregisterFramework(
// info?
LOG(INFO) << "Framework " << *framework << " failed over";
failoverFramework(framework, from);
- } else if (from != framework->pid) {
+ } else if (framework->pid != from) {
LOG(ERROR)
<< "Disallowing re-registration attempt of framework " << *framework
<< " because it is not expected from " << from;
@@ -2063,7 +2064,7 @@ void Master::_reregisterFramework(
FrameworkReregisteredMessage message;
message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
message.mutable_master_info()->MergeFrom(info_);
- send(from, message);
+ framework->send(message);
return;
}
} else {
@@ -2071,7 +2072,7 @@ void Master::_reregisterFramework(
// elected Mesos master to which either an existing scheduler or a
// failed-over one is connecting. Create a Framework object and add
// any tasks it has that have been reported by reconnecting slaves.
- Framework* framework = new Framework(frameworkInfo, from);
+ Framework* framework = new Framework(this, frameworkInfo, from);
// TODO(benh): Check for root submissions like above!
@@ -2098,7 +2099,7 @@ void Master::_reregisterFramework(
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_master_info()->MergeFrom(info_);
- send(framework->pid, message);
+ framework->send(message);
}
CHECK(frameworks.registered.contains(frameworkInfo.id()))
@@ -2172,7 +2173,7 @@ void Master::deactivateFramework(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring deactivate framework message for framework " << *framework
<< " because it is not expected from " << from;
@@ -2191,9 +2192,17 @@ void Master::disconnect(Framework* framework)
framework->connected = false;
- // Remove the framework from authenticated. This is safe because
- // a framework will always reauthenticate before (re-)registering.
- authenticated.erase(framework->pid);
+ if (framework->pid.isSome()) {
+ // Remove the framework from authenticated. This is safe because
+ // a framework will always reauthenticate before (re-)registering.
+ authenticated.erase(framework->pid.get());
+ } else {
+ CHECK_SOME(framework->http);
+
+ // Close the HTTP connection, which may already have
+ // been closed due to scheduler disconnection.
+ framework->http.get().writer.close();
+ }
deactivate(framework);
}
@@ -2275,7 +2284,7 @@ void Master::resourceRequest(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring resource request message from framework " << *framework
<< " because it is not expected from " << from;
@@ -2329,12 +2338,11 @@ void Master::launchTasks(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring launch tasks message for offers " << stringify(offerIds)
- << " of framework " << frameworkId << " from '" << from
- << "' because it is not from the registered framework '"
- << framework->pid << "'";
+ << " from '" << from << "' because it is not from the"
+ << " registered framework " << *framework;
return;
}
@@ -2882,7 +2890,11 @@ void Master::_accept(
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
message.mutable_framework_id()->MergeFrom(framework->id());
- message.set_pid(framework->pid);
+
+ // TODO(anand): We set 'pid' to UPID() for http frameworks
+ // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+ // no longer have to set pid here for http frameowrks.
+ message.set_pid(framework->pid.getOrElse(UPID()));
message.mutable_task()->MergeFrom(task_);
if (HookManager::hooksAvailable()) {
@@ -2958,7 +2970,7 @@ void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring revive offers message for framework " << *framework
<< " because it is not expected from " << from;
@@ -2997,7 +3009,7 @@ void Master::killTask(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring kill task message for task " << taskId << " of framework "
<< *framework << " because it is not expected from " << from;
@@ -3117,7 +3129,7 @@ void Master::statusUpdateAcknowledgement(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring status update acknowledgement " << UUID::fromBytes(uuid)
<< " for task " << taskId << " of framework " << *framework
@@ -3232,11 +3244,11 @@ void Master::schedulerMessage(
return;
}
- if (from != framework->pid) {
- LOG(WARNING) << "Ignoring framework message"
- << " for executor '" << executorId << "'"
- << " of framework " << *framework
- << " because it is not expected from " << from;
+ if (framework->pid != from) {
+ LOG(WARNING)
+ << "Ignoring framework message for executor " << executorId
+ << " of framework " << *framework
+ << " because it is not expected from " << from;
metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -3307,7 +3319,8 @@ void Master::executorMessage(
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
- send(framework->pid, message);
+
+ framework->send(message);
metrics->valid_executor_to_framework_messages++;
}
@@ -3712,16 +3725,23 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
CHECK_NOTNULL(slave);
// Send the latest framework pids to the slave.
- hashset<UPID> pids;
+ hashset<FrameworkID> ids;
+
foreach (const Task& task, tasks) {
Framework* framework = getFramework(task.framework_id());
- if (framework != NULL && !pids.contains(framework->pid)) {
+
+ if (framework != NULL && !ids.contains(framework->id())) {
UpdateFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
- message.set_pid(framework->pid);
+
+ // TODO(anand): We set 'pid' to UPID() for http frameworks
+ // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+ // no longer have to set pid here for http frameowrks.
+ message.set_pid(framework->pid.getOrElse(UPID()));
+
send(slave->pid, message);
- pids.insert(framework->pid);
+ ids.insert(framework->id());
}
}
@@ -3905,7 +3925,7 @@ void Master::forward(
StatusUpdateMessage message;
message.mutable_update()->MergeFrom(update);
message.set_pid(acknowledgee);
- send(framework->pid, message);
+ framework->send(message);
}
@@ -3977,7 +3997,7 @@ void Master::exitedExecutor(
message.mutable_slave_id()->CopyFrom(slaveId);
message.set_status(status);
- send(framework->pid, message);
+ framework->send(message);
}
@@ -4064,7 +4084,7 @@ void Master::reconcileTasks(
return;
}
- if (from != framework->pid) {
+ if (framework->pid != from) {
LOG(WARNING)
<< "Ignoring reconcile tasks message for framework " << *framework
<< " because it is not expected from " << from;
@@ -4106,7 +4126,7 @@ void Master::_reconcileTasks(
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
- send(framework->pid, message);
+ framework->send(message);
}
foreachvalue (Task* task, framework->tasks) {
@@ -4139,7 +4159,7 @@ void Master::_reconcileTasks(
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
- send(framework->pid, message);
+ framework->send(message);
}
return;
@@ -4243,7 +4263,7 @@ void Master::_reconcileTasks(
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update.get());
- send(framework->pid, message);
+ framework->send(message);
}
}
}
@@ -4405,7 +4425,7 @@ void Master::offer(const FrameworkID& frameworkId,
LOG(INFO) << "Sending " << message.offers().size()
<< " offers to framework " << *framework;
- send(framework->pid, message);
+ framework->send(message);
}
@@ -4718,9 +4738,15 @@ void Master::addFramework(Framework* framework)
CHECK(!frameworks.registered.contains(framework->id()))
<< "Framework " << *framework << " already exists!";
+ CHECK_SOME(framework->pid) << "adding http framework not implemented";
+
frameworks.registered[framework->id()] = framework;
- link(framework->pid);
+ link(framework->pid.get());
+
+ // TODO(anand): For http frameworks, add a readerClosed()
+ // callback to invoke Master::exited() when the connection
+ // closes.
// Enforced by Master::registerFramework.
CHECK(roles.contains(framework->info.role()))
@@ -4742,13 +4768,13 @@ void Master::addFramework(Framework* framework)
// If the framework is authenticated, its principal should be in
// 'authenticated'. Otherwise look if it's supplied in
// FrameworkInfo.
- Option<string> principal = authenticated.get(framework->pid);
+ Option<string> principal = authenticated.get(framework->pid.get());
if (principal.isNone() && framework->info.has_principal()) {
principal = framework->info.principal();
}
- CHECK(!frameworks.principals.contains(framework->pid));
- frameworks.principals.put(framework->pid, principal);
+ CHECK(!frameworks.principals.contains(framework->pid.get()));
+ frameworks.principals.put(framework->pid.get(), principal);
// Export framework metrics if a principal is specified.
if (principal.isSome()) {
@@ -4767,7 +4793,9 @@ void Master::addFramework(Framework* framework)
// event of a scheduler failover.
void Master::failoverFramework(Framework* framework, const UPID& newPid)
{
- const UPID oldPid = framework->pid;
+ CHECK_SOME(framework->pid) << "http framework failover not implemented";
+
+ const UPID oldPid = framework->pid.get();
// There are a few failover cases to consider:
// 1. The pid has changed. In this case we definitely want to
@@ -4795,7 +4823,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_master_info()->MergeFrom(info_);
- send(newPid, message);
+ framework->send(message);
// Remove the framework's offers (if they weren't removed before).
// We do this after we have updated the pid and sent the framework
@@ -4912,6 +4940,8 @@ void Master::removeFramework(Framework* framework)
// TODO(benh): unlink(framework->pid);
+ // TODO(anand): For http frameworks, close the connection.
+
framework->unregisteredTime = Clock::now();
// The completedFramework buffer now owns the framework pointer.
@@ -4923,21 +4953,26 @@ void Master::removeFramework(Framework* framework)
roles[framework->info.role()]->removeFramework(framework);
- // Remove the framework from authenticated.
- authenticated.erase(framework->pid);
-
- CHECK(frameworks.principals.contains(framework->pid));
- const Option<string> principal = frameworks.principals[framework->pid];
-
- frameworks.principals.erase(framework->pid);
-
- // Remove the framework's message counters.
- if (principal.isSome()) {
- // Remove the metrics for the principal if this framework is the
- // last one with this principal.
- if (!frameworks.principals.containsValue(principal.get())) {
- CHECK(metrics->frameworks.contains(principal.get()));
- metrics->frameworks.erase(principal.get());
+ // TODO(anand): This only works for pid based frameworks. We would
+ // need similar authentication logic for http frameworks.
+ if (framework->pid.isSome()) {
+ // Remove the framework from authenticated.
+ authenticated.erase(framework->pid.get());
+
+ CHECK(frameworks.principals.contains(framework->pid.get()));
+ const Option<string> principal =
+ frameworks.principals[framework->pid.get()];
+
+ frameworks.principals.erase(framework->pid.get());
+
+ // Remove the framework's message counters.
+ if (principal.isSome()) {
+ // Remove the metrics for the principal if this framework is the
+ // last one with this principal.
+ if (!frameworks.principals.containsValue(principal.get())) {
+ CHECK(metrics->frameworks.contains(principal.get()));
+ metrics->frameworks.erase(principal.get());
+ }
}
}
@@ -5220,7 +5255,7 @@ void Master::_removeSlave(
<< "after recovering";
LostSlaveMessage message;
message.mutable_slave_id()->MergeFrom(slaveInfo.id());
- send(framework->pid, message);
+ framework->send(message);
}
}
@@ -5465,7 +5500,7 @@ void Master::removeOffer(Offer* offer, bool rescind)
if (rescind) {
RescindResourceOfferMessage message;
message.mutable_offer_id()->MergeFrom(offer->id());
- send(framework->pid, message);
+ framework->send(message);
}
// Remove and cancel offer removal timers. Canceling the Timers is
http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2c924ad..879e3d8 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -52,7 +52,9 @@
#include <stout/hashset.hpp>
#include <stout/multihashmap.hpp>
#include <stout/option.hpp>
+#include <stout/recordio.hpp>
+#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
@@ -830,6 +832,7 @@ private:
Master(const Master&); // No copying.
Master& operator = (const Master&); // No assigning.
+ friend struct Framework;
friend struct Metrics;
// NOTE: Since 'getOffer' and 'slaves' are protected,
@@ -1211,15 +1214,22 @@ private:
};
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const Framework& framework);
+
+
// Information about a connected or completed framework.
// TODO(bmahler): Keeping the task and executor information in sync
// across the Slave and Framework structs is error prone!
struct Framework
{
- Framework(const FrameworkInfo& _info,
+ Framework(Master* const _master,
+ const FrameworkInfo& _info,
const process::UPID& _pid,
const process::Time& time = process::Clock::now())
- : info(_info),
+ : master(_master),
+ info(_info),
pid(_pid),
connected(true),
active(true),
@@ -1227,7 +1237,49 @@ struct Framework
reregisteredTime(time),
completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
- ~Framework() {}
+ Framework(Master* const _master,
+ const FrameworkInfo& _info,
+ const process::http::Pipe::Writer& writer,
+ ContentType contentType,
+ const process::Time& time = process::Clock::now())
+ : master(_master),
+ info(_info),
+ connected(true),
+ active(true),
+ registeredTime(time),
+ reregisteredTime(time),
+ completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK)
+ {
+ // TODO(anand): This logic needs to be invoked each
+ // time the framework connects via http. Move it to
+ // a method instead, that gets invoked from
+ // addFramework and failoverFrameowrk.
+
+ auto serialize = [contentType](const scheduler::Event& event) {
+ switch (contentType) {
+ case ContentType::PROTOBUF: {
+ return event.SerializeAsString();
+ }
+ case ContentType::JSON: {
+ JSON::Object object = JSON::Protobuf(event);
+ return stringify(object);
+ }
+ }
+ };
+
+ auto encoder = recordio::Encoder<scheduler::Event>(serialize);
+
+ http = Http {writer, encoder};
+ }
+
+ ~Framework()
+ {
+ if (http.isSome() && connected) {
+ if (!http.get().writer.close()) {
+ LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
+ }
+ }
+ }
Task* getTask(const TaskID& taskId)
{
@@ -1270,6 +1322,29 @@ struct Framework
}
}
+ // Sends a message to the connected framework.
+ template <typename Message>
+ void send(const Message& message)
+ {
+ if (!connected) {
+ LOG(WARNING) << "Master attempted to send message to disconnected"
+ << " framework " << *this;
+ return;
+ }
+
+ if (http.isSome()) {
+ const scheduler::Event event = protobuf::scheduler::event(message);
+
+ if (!http.get().writer.write(http.get().encoder.encode(event))) {
+ LOG(WARNING) << "Unable to send event to framework " << *this << ":"
+ << " connection closed";
+ }
+ } else {
+ CHECK_SOME(pid);
+ master->send(pid.get(), message);
+ }
+ }
+
void addCompletedTask(const Task& task)
{
// TODO(adam-mesos): Check if completed task already exists.
@@ -1420,9 +1495,22 @@ struct Framework
}
}
+ Master* const master;
+
FrameworkInfo info;
- process::UPID pid;
+ struct Http
+ {
+ process::http::Pipe::Writer writer;
+ recordio::Encoder<scheduler::Event> encoder;
+ };
+
+ // Frameworks can either be connected via HTTP or by message
+ // passing (scheduler driver). Exactly one of 'http' and 'pid'
+ // will be set according to the last connection made by the
+ // framework.
+ Option<Http> http;
+ Option<process::UPID> pid;
// Framework becomes disconnected when the socket closes.
bool connected;
@@ -1493,8 +1581,13 @@ inline std::ostream& operator << (
{
// TODO(vinod): Also log the hostname once FrameworkInfo is properly
// updated on framework failover (MESOS-1784).
- return stream << framework.id() << " (" << framework.info.name()
- << ") at " << framework.pid;
+ stream << framework.id() << " (" << framework.info.name() << ")";
+
+ if (framework.pid.isSome()) {
+ stream << " at " << framework.pid.get();
+ }
+
+ return stream;
}