You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/12/08 00:03:46 UTC
[2/3] mesos git commit: Eliminated some copying of tasks / executors
in agent re-registration.
Eliminated some copying of tasks / executors in agent re-registration.
Review: https://reviews.apache.org/r/64428
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4fe3bdb8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4fe3bdb8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4fe3bdb8
Branch: refs/heads/master
Commit: 4fe3bdb8ed5e8d4ddc894ff7fd5cbcd3183526be
Parents: 108eb06
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Dec 7 12:03:58 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Dec 7 16:03:23 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 79 +++++++++++++++++++++++++++-------------------
src/master/master.hpp | 9 +++---
2 files changed, 51 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4fe3bdb8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1d192db..584398c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cctype>
#include <fstream>
+#include <functional>
#include <iomanip>
#include <list>
#include <memory>
@@ -98,6 +99,7 @@
using google::protobuf::RepeatedPtrField;
using std::list;
+using std::reference_wrapper;
using std::set;
using std::shared_ptr;
using std::string;
@@ -6318,7 +6320,7 @@ void Master::__registerSlave(
++metrics->slave_registrations;
- addSlave(slave);
+ addSlave(slave, {});
Duration pingTimeout =
flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
@@ -6721,16 +6723,6 @@ void Master::__reregisterSlave(
}
};
- vector<Resource> checkpointedResources =
- google::protobuf::convert(reregisterSlaveMessage.checkpointed_resources());
- vector<ExecutorInfo> executorInfos =
- google::protobuf::convert(reregisterSlaveMessage.executor_infos());
- vector<Task> tasks =
- google::protobuf::convert(reregisterSlaveMessage.tasks());
- const vector<FrameworkInfo> frameworks =
- google::protobuf::convert(reregisterSlaveMessage.frameworks());
- vector<Archive::Framework> completedFrameworks =
- google::protobuf::convert(reregisterSlaveMessage.completed_frameworks());
vector<SlaveInfo::Capability> agentCapabilities =
google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
@@ -6740,43 +6732,47 @@ void Master::__reregisterSlave(
// If the agent is not multi-role capable, inject allocation info.
if (!slaveCapabilities.multiRole) {
- hashmap<FrameworkID, FrameworkInfo> frameworks_;
- foreach (const FrameworkInfo& framework, frameworks) {
- frameworks_[framework.id()] = framework;
+ hashmap<FrameworkID, reference_wrapper<const FrameworkInfo>> frameworks;
+
+ foreach (const FrameworkInfo& framework,
+ reregisterSlaveMessage.frameworks()) {
+ frameworks.emplace(framework.id(), framework);
}
- foreach (Task& task, tasks) {
- CHECK(frameworks_.contains(task.framework_id()));
+ foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
+ CHECK(frameworks.contains(task.framework_id()));
injectAllocationInfo(
task.mutable_resources(),
- frameworks_.at(task.framework_id()));
+ frameworks.at(task.framework_id()));
}
- foreach (ExecutorInfo& executor, executorInfos) {
- CHECK(frameworks_.contains(executor.framework_id()));
+ foreach (ExecutorInfo& executor,
+ *reregisterSlaveMessage.mutable_executor_infos()) {
+ CHECK(frameworks.contains(executor.framework_id()));
injectAllocationInfo(
executor.mutable_resources(),
- frameworks_.at(executor.framework_id()));
+ frameworks.at(executor.framework_id()));
}
}
// Currently, The agent always downgrades the resources such that
// a 1.4.0 agent can speak to a pre-1.4.0 master. We therefore
// unconditionally upgrade the resources back here.
- foreach (Task& task, tasks) {
+ foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
convertResourceFormat(
task.mutable_resources(), POST_RESERVATION_REFINEMENT);
}
- foreach (ExecutorInfo& executor, executorInfos) {
+ foreach (ExecutorInfo& executor,
+ *reregisterSlaveMessage.mutable_executor_infos()) {
convertResourceFormat(
executor.mutable_resources(), POST_RESERVATION_REFINEMENT);
}
foreach (Archive::Framework& completedFramework,
- completedFrameworks) {
+ *reregisterSlaveMessage.mutable_completed_frameworks()) {
foreach (Task& task, *completedFramework.mutable_tasks()) {
convertResourceFormat(
task.mutable_resources(), POST_RESERVATION_REFINEMENT);
@@ -6791,7 +6787,8 @@ void Master::__reregisterSlave(
// re-registering agent that are partition-aware.
hashset<FrameworkID> partitionAwareFrameworks;
- foreach (const FrameworkInfo& framework, frameworks) {
+ foreach (const FrameworkInfo& framework,
+ reregisterSlaveMessage.frameworks()) {
if (protobuf::frameworkHasCapability(
framework, FrameworkInfo::Capability::PARTITION_AWARE)) {
partitionAwareFrameworks.insert(framework.id());
@@ -6802,7 +6799,7 @@ void Master::__reregisterSlave(
// master (those tasks were previously marked "unreachable", so they
// should be removed from that collection).
vector<Task> recoveredTasks;
- foreach (Task& task, tasks) {
+ foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
const FrameworkID& frameworkId = task.framework_id();
// Don't re-add tasks whose framework has been shutdown at the
@@ -6819,6 +6816,11 @@ void Master::__reregisterSlave(
recoveredTasks.push_back(std::move(task));
}
+ vector<Resource> checkpointedResources = google::protobuf::convert(
+ std::move(*reregisterSlaveMessage.mutable_checkpointed_resources()));
+ vector<ExecutorInfo> executorInfos = google::protobuf::convert(
+ std::move(*reregisterSlaveMessage.mutable_executor_infos()));
+
Slave* slave = new Slave(
this,
slaveInfo,
@@ -6840,7 +6842,10 @@ void Master::__reregisterSlave(
slaves.removed.erase(slave->id);
slaves.unreachable.erase(slave->id);
- addSlave(slave, completedFrameworks);
+ vector<Archive::Framework> completedFrameworks = google::protobuf::convert(
+ std::move(*reregisterSlaveMessage.mutable_completed_frameworks()));
+
+ addSlave(slave, std::move(completedFrameworks));
Duration pingTimeout =
flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
@@ -6865,7 +6870,8 @@ void Master::__reregisterSlave(
// lost when the master fails over. Also, we only store a limited
// number of completed frameworks. A proper fix likely involves
// storing framework information in the registry (MESOS-1719).
- foreach (const FrameworkInfo& framework, frameworks) {
+ foreach (const FrameworkInfo& framework,
+ reregisterSlaveMessage.frameworks()) {
if (isCompletedFramework(framework.id())) {
LOG(INFO) << "Shutting down framework " << framework.id()
<< " at re-registered agent " << *slave
@@ -6877,6 +6883,12 @@ void Master::__reregisterSlave(
}
}
+ // TODO(bmahler): Consider moving this in to `updateSlaveFrameworks`,
+ // would be helpful when there are a large total number of frameworks
+ // in the cluster.
+ const vector<FrameworkInfo> frameworks = google::protobuf::convert(
+ std::move(*reregisterSlaveMessage.mutable_frameworks()));
+
updateSlaveFrameworks(slave, frameworks);
slaves.reregistering.erase(slaveInfo.id());
@@ -7066,6 +7078,9 @@ void Master::updateSlaveFrameworks(
Framework* framework = getFramework(frameworkInfo.id());
if (framework != nullptr) {
+ // TODO(bmahler): Copying the framework info here can be
+ // expensive, consider only sending this message when
+ // there has been a change vs what the agent reported.
UpdateFrameworkMessage message;
message.mutable_framework_id()->CopyFrom(framework->id());
message.mutable_framework_info()->CopyFrom(framework->info);
@@ -9433,7 +9448,7 @@ void Master::removeFramework(Framework* framework)
CHECK(!slaves.registered.contains(task->slave_id()));
// Move task from unreachable map to completed map.
- framework->addCompletedTask(*task);
+ framework->addCompletedTask(std::move(*task));
framework->unreachableTasks.erase(taskId);
}
@@ -9554,7 +9569,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
void Master::addSlave(
Slave* slave,
- const vector<Archive::Framework>& completedFrameworks)
+ vector<Archive::Framework>&& completedFrameworks)
{
CHECK_NOTNULL(slave);
CHECK(!slaves.registered.contains(slave->id));
@@ -9628,16 +9643,16 @@ void Master::addSlave(
//
// TODO(vinod): Reconcile the notion of a completed framework across
// the master and slave.
- foreach (const Archive::Framework& completedFramework, completedFrameworks) {
+ foreach (Archive::Framework& completedFramework, completedFrameworks) {
Framework* framework = getFramework(
completedFramework.framework_info().id());
- foreach (const Task& task, completedFramework.tasks()) {
+ foreach (Task& task, *completedFramework.mutable_tasks()) {
if (framework != nullptr) {
VLOG(2) << "Re-adding completed task " << task.task_id()
<< " of framework " << *framework
<< " that ran on agent " << *slave;
- framework->addCompletedTask(task);
+ framework->addCompletedTask(std::move(task));
} else {
// The framework might not be re-registered yet.
//
http://git-wip-us.apache.org/repos/asf/mesos/blob/4fe3bdb8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2658312..1f5daae 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -710,8 +710,7 @@ protected:
// Add a slave.
void addSlave(
Slave* slave,
- const std::vector<Archive::Framework>& completedFrameworks =
- std::vector<Archive::Framework>());
+ std::vector<Archive::Framework>&& completedFrameworks);
void _markUnreachable(
Slave* slave,
@@ -2308,14 +2307,14 @@ struct Framework
}
}
- void addCompletedTask(const Task& task)
+ void addCompletedTask(Task&& task)
{
// TODO(neilc): We currently allow frameworks to reuse the task
// IDs of completed tasks (although this is discouraged). This
// means that there might be multiple completed tasks with the
// same task ID. We should consider rejecting attempts to reuse
// task IDs (MESOS-6779).
- completedTasks.push_back(process::Owned<Task>(new Task(task)));
+ completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
}
void addUnreachableTask(const Task& task)
@@ -2343,7 +2342,7 @@ struct Framework
<< task->state();
addUnreachableTask(*task);
} else {
- addCompletedTask(*task);
+ addCompletedTask(Task(*task));
}
tasks.erase(task->task_id());