You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/12/06 20:21:47 UTC
[2/2] mesos git commit: Changed agent reregistration to work with
messages directly.
Changed agent reregistration to work with messages directly.
`reregisterSlave` now accepts `ReregisterSlaveMessage&&`, which opts-out
of using protobuf arena, and allows passing message through dispatch
chain without making any copies. Conversion of repeated message fields
to `std::vector`s is performed only when needed.
Review: https://reviews.apache.org/r/63914/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b422804
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b422804
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b422804
Branch: refs/heads/master
Commit: 8b4228049292cab6a8c4a3680de3c1aa5e72a9ff
Parents: 4934eb7
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Wed Dec 6 09:03:49 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Wed Dec 6 12:21:27 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 194 ++++++++++-------------------
src/master/master.hpp | 38 +-----
src/master/validation.cpp | 15 +--
src/master/validation.hpp | 6 +-
src/tests/master_validation_tests.cpp | 57 +++++----
5 files changed, 107 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 16cdde7..d66f956 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -853,16 +853,7 @@ void Master::initialize()
&RegisterSlaveMessage::resource_version_uuids);
install<ReregisterSlaveMessage>(
- &Master::reregisterSlave,
- &ReregisterSlaveMessage::slave,
- &ReregisterSlaveMessage::checkpointed_resources,
- &ReregisterSlaveMessage::executor_infos,
- &ReregisterSlaveMessage::tasks,
- &ReregisterSlaveMessage::frameworks,
- &ReregisterSlaveMessage::completed_frameworks,
- &ReregisterSlaveMessage::version,
- &ReregisterSlaveMessage::agent_capabilities,
- &ReregisterSlaveMessage::resource_version_uuids);
+ &Master::reregisterSlave);
install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
@@ -6302,15 +6293,7 @@ void Master::__registerSlave(
void Master::reregisterSlave(
const UPID& from,
- const SlaveInfo& slaveInfo,
- const vector<Resource>& checkpointedResources,
- const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks,
- const vector<FrameworkInfo>& frameworks,
- const vector<Archive::Framework>& completedFrameworks,
- const string& version,
- const vector<SlaveInfo::Capability>& agentCapabilities,
- const vector<ResourceVersionUUID>& resourceVersions)
+ ReregisterSlaveMessage&& reregisterSlaveMessage)
{
++metrics->messages_reregister_slave;
@@ -6322,15 +6305,7 @@ void Master::reregisterSlave(
.onReady(defer(self(),
&Self::reregisterSlave,
from,
- slaveInfo,
- checkpointedResources,
- executorInfos,
- tasks,
- frameworks,
- completedFrameworks,
- version,
- agentCapabilities,
- resourceVersions));
+ std::move(reregisterSlaveMessage)));
return;
}
@@ -6351,6 +6326,7 @@ void Master::reregisterSlave(
// capabilities or a higher version (or a changed SlaveInfo, after Mesos 1.5).
// However, this should very rarely happen in practice, and nobody seems to
// have complained about it so far.
+ const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave();
if (slaves.reregistering.contains(slaveInfo.id())) {
LOG(INFO)
<< "Ignoring re-register agent message from agent "
@@ -6377,22 +6353,8 @@ void Master::reregisterSlave(
return;
}
- Option<Error> error = validation::master::message::reregisterSlave(
- slaveInfo, tasks, checkpointedResources, executorInfos, frameworks);
-
- // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT`
- // format. We do this as early as possible so that we only use a single
- // format inside master, and downgrade again if necessary when they leave the
- // master (e.g. when writing to the registry).
- // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
- // here for consistency.
- SlaveInfo _slaveInfo(slaveInfo);
- convertResourceFormat(
- _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
-
- std::vector<Resource> _checkpointedResources(checkpointedResources);
- convertResourceFormat(
- &_checkpointedResources, POST_RESERVATION_REFINEMENT);
+ Option<Error> error =
+ validation::master::message::reregisterSlave(reregisterSlaveMessage);
if (error.isSome()) {
LOG(WARNING) << "Dropping re-registration of agent at " << from
@@ -6407,6 +6369,20 @@ void Master::reregisterSlave(
slaves.reregistering.insert(slaveInfo.id());
+ // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT`
+ // format. We do this as early as possible so that we only use a single
+ // format inside master, and downgrade again if necessary when they leave the
+ // master (e.g. when writing to the registry).
+ // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
+ // here for consistency.
+ convertResourceFormat(
+ reregisterSlaveMessage.mutable_slave()->mutable_resources(),
+ POST_RESERVATION_REFINEMENT);
+
+ convertResourceFormat(
+ reregisterSlaveMessage.mutable_checkpointed_resources(),
+ POST_RESERVATION_REFINEMENT);
+
// Note that the principal may be empty if authentication is not
// required. Also it is passed along because it may be removed from
// `authenticated` while the authorization is pending.
@@ -6415,36 +6391,22 @@ void Master::reregisterSlave(
authorizeSlave(principal)
.onAny(defer(self(),
&Self::_reregisterSlave,
- _slaveInfo,
from,
+ std::move(reregisterSlaveMessage),
principal,
- _checkpointedResources,
- executorInfos,
- tasks,
- frameworks,
- completedFrameworks,
- version,
- agentCapabilities,
- resourceVersions,
lambda::_1));
}
void Master::_reregisterSlave(
- const SlaveInfo& slaveInfo,
const UPID& pid,
+ ReregisterSlaveMessage&& reregisterSlaveMessage,
const Option<string>& principal,
- const vector<Resource>& checkpointedResources,
- const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks,
- const vector<FrameworkInfo>& frameworks,
- const vector<Archive::Framework>& completedFrameworks,
- const string& version,
- const vector<SlaveInfo::Capability>& agentCapabilities,
- const vector<ResourceVersionUUID>& resourceVersions,
const Future<bool>& authorized)
{
CHECK(!authorized.isDiscarded());
+
+ const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave();
CHECK(slaves.reregistering.contains(slaveInfo.id()));
Option<string> authorizationError = None();
@@ -6516,6 +6478,7 @@ void Master::_reregisterSlave(
// Ignore re-registration attempts by agents running old Mesos versions.
// We expect that the agent's version is in SemVer format; if the
// version cannot be parsed, the re-registration attempt is ignored.
+ const string& version = reregisterSlaveMessage.version();
Try<Version> parsedVersion = Version::parse(version);
if (parsedVersion.isError()) {
@@ -6587,27 +6550,15 @@ void Master::_reregisterSlave(
// previously known state.
if (slaveInfo == slave->info) {
___reregisterSlave(
- slaveInfo,
pid,
- executorInfos,
- tasks,
- frameworks,
- version,
- agentCapabilities,
- resourceVersions,
+ std::move(reregisterSlaveMessage),
true);
} else {
registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
.onAny(defer(self(),
&Self::___reregisterSlave,
- slaveInfo,
pid,
- executorInfos,
- tasks,
- frameworks,
- version,
- agentCapabilities,
- resourceVersions,
+ std::move(reregisterSlaveMessage),
lambda::_1));
}
} else if (slaves.recovered.contains(slaveInfo.id())) {
@@ -6624,31 +6575,15 @@ void Master::_reregisterSlave(
// previously known state (see also MESOS-7711).
if (slaveInfo == recoveredInfo) {
__reregisterSlave(
- slaveInfo,
pid,
- checkpointedResources,
- executorInfos,
- tasks,
- frameworks,
- completedFrameworks,
- version,
- agentCapabilities,
- resourceVersions,
+ std::move(reregisterSlaveMessage),
true);
} else {
registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
.onAny(defer(self(),
&Self::__reregisterSlave,
- slaveInfo,
pid,
- checkpointedResources,
- executorInfos,
- tasks,
- frameworks,
- completedFrameworks,
- version,
- agentCapabilities,
- resourceVersions,
+ std::move(reregisterSlaveMessage),
lambda::_1));
}
} else {
@@ -6663,34 +6598,19 @@ void Master::_reregisterSlave(
registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo)))
.onAny(defer(self(),
&Self::__reregisterSlave,
- slaveInfo,
pid,
- checkpointedResources,
- executorInfos,
- tasks,
- frameworks,
- completedFrameworks,
- version,
- agentCapabilities,
- resourceVersions,
+ std::move(reregisterSlaveMessage),
lambda::_1));
}
}
void Master::__reregisterSlave(
- const SlaveInfo& slaveInfo,
const UPID& pid,
- const vector<Resource>& checkpointedResources,
- const vector<ExecutorInfo>& executorInfos_,
- const vector<Task>& tasks_,
- const vector<FrameworkInfo>& frameworks,
- const vector<Archive::Framework>& completedFrameworks_,
- const string& version,
- const vector<SlaveInfo::Capability>& agentCapabilities,
- const vector<ResourceVersionUUID>& resourceVersions,
+ ReregisterSlaveMessage&& reregisterSlaveMessage,
const Future<bool>& future)
{
+ const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave();
CHECK(slaves.reregistering.contains(slaveInfo.id()));
if (future.isFailed()) {
@@ -6752,12 +6672,22 @@ void Master::__reregisterSlave(
}
};
+ vector<Resource> checkpointedResources =
+ google::protobuf::convert(reregisterSlaveMessage.checkpointed_resources());
+ vector<ExecutorInfo> executorInfos =
+ google::protobuf::convert(reregisterSlaveMessage.executor_infos());
+ vector<Task> tasks =
+ google::protobuf::convert(reregisterSlaveMessage.tasks());
+ const vector<FrameworkInfo> frameworks =
+ google::protobuf::convert(reregisterSlaveMessage.frameworks());
+ vector<Archive::Framework> completedFrameworks =
+ google::protobuf::convert(reregisterSlaveMessage.completed_frameworks());
+ vector<SlaveInfo::Capability> agentCapabilities =
+ google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
+
// Adjust the agent's task and executor infos to ensure
// compatibility with old agents without certain capabilities.
protobuf::slave::Capabilities slaveCapabilities(agentCapabilities);
- vector<Task> tasks = tasks_;
- vector<ExecutorInfo> executorInfos = executorInfos_;
- vector<Archive::Framework> completedFrameworks = completedFrameworks_;
// If the agent is not multi-role capable, inject allocation info.
if (!slaveCapabilities.multiRole) {
@@ -6845,14 +6775,14 @@ void Master::__reregisterSlave(
slaveInfo,
pid,
machineId,
- version,
- agentCapabilities,
+ reregisterSlaveMessage.version(),
+ std::move(agentCapabilities),
Clock::now(),
- checkpointedResources,
+ std::move(checkpointedResources),
protobuf::parseResourceVersions(
- {resourceVersions.begin(), resourceVersions.end()}),
- executorInfos,
- recoveredTasks);
+ reregisterSlaveMessage.resource_version_uuids()),
+ std::move(executorInfos),
+ std::move(recoveredTasks));
slave->reregisteredTime = Clock::now();
@@ -6905,16 +6835,11 @@ void Master::__reregisterSlave(
void Master::___reregisterSlave(
- const SlaveInfo& slaveInfo,
const process::UPID& pid,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<FrameworkInfo>& frameworks,
- const std::string& version,
- const std::vector<SlaveInfo::Capability>& agentCapabilities,
- const vector<ResourceVersionUUID>& resourceVersions,
+ ReregisterSlaveMessage&& reregisterSlaveMessage,
const process::Future<bool>& updated)
{
+ const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave();
CHECK(slaves.reregistering.contains(slaveInfo.id()));
CHECK_READY(updated);
@@ -6965,6 +6890,12 @@ void Master::___reregisterSlave(
slave->pid = pid;
link(slave->pid);
+ const string& version = reregisterSlaveMessage.version();
+ const vector<SlaveInfo::Capability> agentCapabilities =
+ google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
+ const vector<ResourceVersionUUID> resourceVersions =
+ google::protobuf::convert(reregisterSlaveMessage.resource_version_uuids());
+
Try<Nothing> stateUpdated =
slave->update(slaveInfo, version, agentCapabilities, resourceVersions);
@@ -6993,6 +6924,13 @@ void Master::___reregisterSlave(
slave->totalResources,
agentCapabilities);
+ const vector<ExecutorInfo> executorInfos =
+ google::protobuf::convert(reregisterSlaveMessage.executor_infos());
+ const vector<Task> tasks =
+ google::protobuf::convert(reregisterSlaveMessage.tasks());
+ const vector<FrameworkInfo> frameworks =
+ google::protobuf::convert(reregisterSlaveMessage.frameworks());
+
// Reconcile tasks between master and slave, and send the
// `SlaveReregisteredMessage`.
reconcileKnownSlave(slave, executorInfos, tasks);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d42acae..acea507 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -475,15 +475,7 @@ public:
void reregisterSlave(
const process::UPID& from,
- const SlaveInfo& slaveInfo,
- const std::vector<Resource>& checkpointedResources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<FrameworkInfo>& frameworks,
- const std::vector<Archive::Framework>& completedFrameworks,
- const std::string& version,
- const std::vector<SlaveInfo::Capability>& agentCapabilities,
- const std::vector<ResourceVersionUUID>& resourceVersions);
+ ReregisterSlaveMessage&& incomingMessage);
void unregisterSlave(
const process::UPID& from,
@@ -616,41 +608,19 @@ protected:
const process::Future<bool>& admit);
void _reregisterSlave(
- const SlaveInfo& slaveInfo,
const process::UPID& pid,
+ ReregisterSlaveMessage&& incomingMessage,
const Option<std::string>& principal,
- const std::vector<Resource>& checkpointedResources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<FrameworkInfo>& frameworks,
- const std::vector<Archive::Framework>& completedFrameworks,
- const std::string& version,
- const std::vector<SlaveInfo::Capability>& agentCapabilities,
- const std::vector<ResourceVersionUUID>& resourceVersions,
const process::Future<bool>& authorized);
void __reregisterSlave(
- const SlaveInfo& slaveInfo,
const process::UPID& pid,
- const std::vector<Resource>& checkpointedResources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<FrameworkInfo>& frameworks,
- const std::vector<Archive::Framework>& completedFrameworks,
- const std::string& version,
- const std::vector<SlaveInfo::Capability>& agentCapabilities,
- const std::vector<ResourceVersionUUID>& resourceVersions,
+ ReregisterSlaveMessage&& incomingMessage,
const process::Future<bool>& readmit);
void ___reregisterSlave(
- const SlaveInfo& slaveInfo,
const process::UPID& pid,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<FrameworkInfo>& frameworks,
- const std::string& version,
- const std::vector<SlaveInfo::Capability>& agentCapabilities,
- const std::vector<ResourceVersionUUID>& resourceVersions,
+ ReregisterSlaveMessage&& incomingMessage,
const process::Future<bool>& updated);
void updateSlaveFrameworks(
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 8b5848b..bf7ae65 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -292,28 +292,25 @@ Option<Error> registerSlave(
Option<Error> reregisterSlave(
- const SlaveInfo& slaveInfo,
- const vector<Task>& tasks,
- const vector<Resource>& resources,
- const vector<ExecutorInfo>& executorInfos,
- const vector<FrameworkInfo>& frameworkInfos)
+ const ReregisterSlaveMessage& message)
{
hashset<FrameworkID> frameworkIDs;
hashset<pair<FrameworkID, ExecutorID>> executorIDs;
+ const SlaveInfo& slaveInfo = message.slave();
Option<Error> error = validateSlaveInfo(slaveInfo);
if (error.isSome()) {
return error.get();
}
- foreach (const Resource& resource, resources) {
+ foreach (const Resource& resource, message.checkpointed_resources()) {
Option<Error> error = Resources::validate(resource);
if (error.isSome()) {
return error.get();
}
}
- foreach (const FrameworkInfo& framework, frameworkInfos) {
+ foreach (const FrameworkInfo& framework, message.frameworks()) {
Option<Error> error = validation::framework::validate(framework);
if (error.isSome()) {
return error.get();
@@ -327,7 +324,7 @@ Option<Error> reregisterSlave(
frameworkIDs.insert(framework.id());
}
- foreach (const ExecutorInfo& executor, executorInfos) {
+ foreach (const ExecutorInfo& executor, message.executor_infos()) {
Option<Error> error = validation::executor::validate(executor);
if (error.isSome()) {
return error.get();
@@ -362,7 +359,7 @@ Option<Error> reregisterSlave(
}
}
- foreach (const Task& task, tasks) {
+ foreach (const Task& task, message.tasks()) {
Option<Error> error = common::validation::validateTaskID(task.task_id());
if (error.isSome()) {
return Error("Task has an invalid TaskID: " + error->message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index ac54062..30db3bf 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -71,11 +71,7 @@ Option<Error> registerSlave(
const std::vector<Resource>& checkpointedResources);
Option<Error> reregisterSlave(
- const SlaveInfo& slaveInfo,
- const std::vector<Task>& tasks,
- const std::vector<Resource>& resources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<FrameworkInfo>& frameworkInfos);
+ const ReregisterSlaveMessage& message);
} // namespace message {
} // namespace master {
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index 6398f16..48c69a5 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -4215,43 +4215,46 @@ TEST_F(RegisterSlaveValidationTest, DropInvalidRegistration)
// validating the ReregisterSlaveMessage.
TEST_F(RegisterSlaveValidationTest, DuplicateExecutorID)
{
- SlaveInfo slaveInfo;
- slaveInfo.mutable_id()->set_value("agent-id");
- slaveInfo.mutable_resources()->CopyFrom(
- Resources::parse("cpus:2;mem:10").get());
+ ReregisterSlaveMessage message;
- vector<Task> tasks;
- vector<Resource> resources;
- vector<ExecutorInfo> executors;
- vector<FrameworkInfo> frameworks;
+ SlaveInfo *slaveInfo = message.mutable_slave();
+ slaveInfo->mutable_id()->set_value("agent-id");
+ slaveInfo->mutable_resources()->CopyFrom(
+ Resources::parse("cpus:2;mem:10").get());
- frameworks.push_back(DEFAULT_FRAMEWORK_INFO);
- frameworks.back().set_name("framework1");
- frameworks.back().mutable_id()->set_value("framework1");
+ FrameworkInfo *framework = message.add_frameworks();
+ framework->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ framework->set_name("framework1");
+ framework->mutable_id()->set_value("framework1");
- frameworks.push_back(DEFAULT_FRAMEWORK_INFO);
- frameworks.back().set_name("framework2");
- frameworks.back().mutable_id()->set_value("framework2");
+ framework = message.add_frameworks();
+ framework->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ framework->set_name("framework2");
+ framework->mutable_id()->set_value("framework2");
- executors.push_back(DEFAULT_EXECUTOR_INFO);
- executors.back().mutable_framework_id()->set_value("framework1");
+ ExecutorInfo *executor = message.add_executor_infos();
+ executor->CopyFrom(DEFAULT_EXECUTOR_INFO);
+ executor->mutable_framework_id()->set_value("framework1");
- executors.push_back(DEFAULT_EXECUTOR_INFO);
- executors.back().mutable_framework_id()->set_value("framework2");
+ executor = message.add_executor_infos();
+ executor->CopyFrom(DEFAULT_EXECUTOR_INFO);
+ executor->mutable_framework_id()->set_value("framework2");
// Executors with the same ID in different frameworks are allowed.
- EXPECT_EQ(executors[0].executor_id(), executors[1].executor_id());
- EXPECT_NE(executors[0].framework_id(), executors[1].framework_id());
- EXPECT_NONE(master::validation::master::message::reregisterSlave(
- slaveInfo, tasks, resources, executors, frameworks));
+ EXPECT_EQ(message.executor_infos(0).executor_id(),
+ message.executor_infos(1).executor_id());
+ EXPECT_NE(message.executor_infos(0).framework_id(),
+ message.executor_infos(1).framework_id());
+ EXPECT_NONE(master::validation::master::message::reregisterSlave(message));
- executors[1].mutable_framework_id()->set_value("framework1");
+ executor->mutable_framework_id()->set_value("framework1");
// Executors with the same ID in in the same framework are not allowed.
- EXPECT_EQ(executors[0].executor_id(), executors[1].executor_id());
- EXPECT_EQ(executors[0].framework_id(), executors[1].framework_id());
- EXPECT_SOME(master::validation::master::message::reregisterSlave(
- slaveInfo, tasks, resources, executors, frameworks));
+ EXPECT_EQ(message.executor_infos(0).executor_id(),
+ message.executor_infos(1).executor_id());
+ EXPECT_EQ(message.executor_infos(0).framework_id(),
+ message.executor_infos(1).framework_id());
+ EXPECT_SOME(master::validation::master::message::reregisterSlave(message));
}
} // namespace tests {