You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2018/01/03 07:01:45 UTC
[4/4] mesos git commit: Used the protobuf-reflection-based
'downgradeResources' utility.
Used the protobuf-reflection-based 'downgradeResources' utility.
Review: https://reviews.apache.org/r/63977
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9e65674
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9e65674
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9e65674
Branch: refs/heads/master
Commit: e9e656747b4da88498e07b44f6bc5fee2a64dad8
Parents: db8984f
Author: Michael Park <mp...@apache.org>
Authored: Mon Dec 11 16:09:41 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Jan 2 22:42:44 2018 -0800
----------------------------------------------------------------------
src/master/master.cpp | 43 +++-------
src/slave/slave.cpp | 137 +++++++-------------------------
src/slave/state.hpp | 41 ++++++++--
src/tests/slave_recovery_tests.cpp | 7 +-
4 files changed, 82 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bba70c3..282fdf8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5004,20 +5004,13 @@ void Master::_accept(
slave->info));
}
- // If the agent does not support reservation refinement,
- // downgrade the task and executor resources to the
- // "pre-reservation-refinement" format. This cannot fail
- // since the master rejects attempts to create refined
- // reservations on non-capable agents.
+ // If the agent does not support reservation refinement, downgrade
+ // the task / executor resources to the "pre-reservation-refinement"
+ // format. This cannot contain any refined reservations since
+ // the master rejects attempts to create refined reservations
+ // on non-capable agents.
if (!slave->capabilities.reservationRefinement) {
- TaskInfo& task = *message.mutable_task();
-
- CHECK_SOME(downgradeResources(task.mutable_resources()));
-
- if (task.has_executor()) {
- CHECK_SOME(downgradeResources(
- task.mutable_executor()->mutable_resources()));
- }
+ CHECK_SOME(downgradeResources(&message));
}
// TODO(bmahler): Consider updating this log message to
@@ -5208,21 +5201,11 @@ void Master::_accept(
// If the agent does not support reservation refinement, downgrade
// the task and executor resources to the "pre-reservation-refinement"
- // format. This cannot fail since the master rejects attempts to
- // create refined reservations on non-capable agents.
+ // format. This cannot contain any refined reservations since
+ // the master rejects attempts to create refined reservations
+ // on non-capable agents.
if (!slave->capabilities.reservationRefinement) {
- CHECK_SOME(downgradeResources(
- message.mutable_executor()->mutable_resources()));
-
- foreach (
- TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) {
- CHECK_SOME(downgradeResources(task.mutable_resources()));
-
- if (task.has_executor()) {
- CHECK_SOME(downgradeResources(
- task.mutable_executor()->mutable_resources()));
- }
- }
+ CHECK_SOME(downgradeResources(&message));
}
LOG(INFO) << "Launching task group " << stringify(taskIds)
@@ -7148,9 +7131,9 @@ void Master::___reregisterSlave(
//
// TODO(neilc): It would probably be better to prevent the agent
// from re-registering in this scenario.
- Try<Nothing> result = downgradeResources(message.mutable_resources());
+ Try<Nothing> result = downgradeResources(&message);
if (result.isError()) {
- LOG(WARNING) << "Not sending updated checkpointed resouces "
+ LOG(WARNING) << "Not sending updated checkpointed resources "
<< slave->checkpointedResources
<< " with refined reservations, since agent " << *slave
<< " is not RESERVATION_REFINEMENT-capable.";
@@ -10653,7 +10636,7 @@ void Master::_apply(
//
// TODO(neilc): It would probably be better to prevent the agent
// from re-registering in this scenario.
- Try<Nothing> result = downgradeResources(message.mutable_resources());
+ Try<Nothing> result = downgradeResources(&message);
if (result.isError()) {
LOG(WARNING) << "Not sending updated checkpointed resources "
<< slave->checkpointedResources
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ef2394..cfb675d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1294,17 +1294,7 @@ void Slave::registered(
VLOG(1) << "Checkpointing SlaveInfo to '" << path << "'";
- {
- // The `SlaveInfo.resources` does not include dynamic reservations,
- // which means it cannot contain reservation refinements, so
- // `downgradeResources` should always succeed.
- SlaveInfo info_ = info;
-
- Try<Nothing> result = downgradeResources(info_.mutable_resources());
- CHECK_SOME(result);
-
- CHECK_SOME(state::checkpoint(path, info_));
- }
+ CHECK_SOME(state::checkpoint(path, info));
// We start the local resource providers daemon once the agent is
// running, so the resource providers can use the agent API.
@@ -1531,43 +1521,31 @@ void Slave::doReliableRegistration(Duration maxBackoff)
// See MESOS-5330.
link(master.get());
- SlaveInfo slaveInfo = info;
-
- // The `SlaveInfo.resources` does not include dynamic reservations,
- // which means it cannot contain reservation refinements, so
- // `downgradeResources` should always succeed.
- Try<Nothing> result = downgradeResources(slaveInfo.mutable_resources());
- CHECK_SOME(result);
-
- RepeatedPtrField<Resource> checkpointedResources_ = checkpointedResources;
-
- // If the checkpointed resources don't have reservation refinements,
- // send them to the master in "pre-reservation-refinement" format
- // for backward compatibility with old masters. If downgrading is
- // not possible without losing information, send the resources in
- // the "post-reservation-refinement" format. We ignore the return
- // value of `downgradeResources` because for now, we send the result
- // either way.
- //
- // TODO(mpark): Do something smarter with the result once something
- // like a master capability is introduced.
- downgradeResources(&checkpointedResources_);
-
- if (!slaveInfo.has_id()) {
+ if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
message.set_version(MESOS_VERSION);
- message.mutable_slave()->CopyFrom(slaveInfo);
+ message.mutable_slave()->CopyFrom(info);
message.mutable_agent_capabilities()->CopyFrom(
capabilities.toRepeatedPtrField());
// Include checkpointed resources.
- message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+ message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
message.mutable_resource_version_uuid()->set_value(
resourceVersion.toBytes());
+ // If the `Try` from `downgradeResources` returns an `Error`, we currently
+ // continue to send the resources to the master in a partially downgraded
+ // state. This implies that an agent with refined reservations cannot work
+ // with versions of master before reservation refinement support, which was
+ // introduced in 1.4.0.
+ //
+ // TODO(mpark): Do something smarter with the result once something
+ // like a master capability is introduced.
+ downgradeResources(&message);
+
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
@@ -1578,12 +1556,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
capabilities.toRepeatedPtrField());
// Include checkpointed resources.
- message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+ message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
message.mutable_resource_version_uuid()->set_value(
resourceVersion.toBytes());
- message.mutable_slave()->CopyFrom(slaveInfo);
+ message.mutable_slave()->CopyFrom(info);
foreachvalue (Framework* framework, frameworks) {
message.add_frameworks()->CopyFrom(framework->info);
@@ -1671,29 +1649,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
}
}
- // If the resources don't have reservation refinements, send them
- // to the master in "pre-reservation-refinement" format for backward
- // compatibility with old masters. If downgrading is not possible
- // without losing information, send the resources in the
- // "post-reservation-refinement" format. We ignore the return value of
- // `downgradeResources` because for now, we send the result either way.
+ // If the `Try` from `downgradeResources` returns an `Error`, we currently
+ // continue to send the resources to the master in a partially downgraded
+ // state. This implies that an agent with refined reservations cannot work
+ // with versions of master before reservation refinement support, which was
+ // introduced in 1.4.0.
//
// TODO(mpark): Do something smarter with the result once something
// like a master capability is introduced.
- foreach (Task& task, *message.mutable_tasks()) {
- downgradeResources(task.mutable_resources());
- }
-
- foreach (ExecutorInfo& executor, *message.mutable_executor_infos()) {
- downgradeResources(executor.mutable_resources());
- }
-
- foreach (Archive::Framework& completedFramework,
- *message.mutable_completed_frameworks()) {
- foreach (Task& task, *completedFramework.mutable_tasks()) {
- downgradeResources(task.mutable_resources());
- }
- }
+ downgradeResources(&message);
CHECK_SOME(master);
send(master.get(), message);
@@ -3683,27 +3647,10 @@ void Slave::checkpointResources(
// we avoid a case of inconsistency between the master and the agent if
// the agent restarts during handling of CheckpointResourcesMessage.
- {
- // If the checkpointed resources don't have reservation refinements,
- // checkpoint them on the agent in "pre-reservation-refinement" format
- // for backward compatibility with old agents. If downgrading is
- // not possible without losing information, checkpoint the resources in
- // the "post-reservation-refinement" format. We ignore the return
- // value of `downgradeResources` because for now, we checkpoint the result
- // either way.
- //
- // TODO(mpark): Do something smarter with the result once something
- // like agent capability requirements is introduced.
- RepeatedPtrField<Resource> newCheckpointedResources_ =
- newCheckpointedResources;
-
- downgradeResources(&newCheckpointedResources_);
-
- CHECK_SOME(state::checkpoint(
- paths::getResourcesTargetPath(metaDir),
- newCheckpointedResources_))
- << "Failed to checkpoint resources target " << newCheckpointedResources_;
- }
+ CHECK_SOME(state::checkpoint(
+ paths::getResourcesTargetPath(metaDir),
+ newCheckpointedResources))
+ << "Failed to checkpoint resources target " << newCheckpointedResources;
Try<Nothing> syncResult = syncCheckpointedResources(
newCheckpointedResources);
@@ -9006,21 +8953,7 @@ void Executor::checkpointExecutor()
VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
- {
- // If the checkpointed resources don't have reservation refinements,
- // checkpoint them on the agent in "pre-reservation-refinement" format
- // for backward compatibility with old agents. If downgrading is
- // not possible without losing information, checkpoint the resources in
- // the "post-reservation-refinement" format. We ignore the return
- // value of `downgradeResources` because for now, we checkpoint the result
- // either way.
- //
- // TODO(mpark): Do something smarter with the result once something
- // like agent capability requirements is introduced.
- ExecutorInfo info_ = info;
- downgradeResources(info_.mutable_resources());
- CHECK_SOME(state::checkpoint(path, info_));
- }
+ CHECK_SOME(state::checkpoint(path, info));
// Create the meta executor directory.
// NOTE: This creates the 'latest' symlink in the meta directory.
@@ -9049,21 +8982,7 @@ void Executor::checkpointTask(const Task& task)
VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
- {
- // If the checkpointed resources don't have reservation refinements,
- // checkpoint them on the agent in "pre-reservation-refinement" format
- // for backward compatibility with old agents. If downgrading is
- // not possible without losing information, checkpoint the resources in
- // the "post-reservation-refinement" format. We ignore the return
- // value of `downgradeResources` because for now, we checkpoint the result
- // either way.
- //
- // TODO(mpark): Do something smarter with the result once something
- // like agent capability requirements is introduced.
- Task task_ = task;
- downgradeResources(task_.mutable_resources());
- CHECK_SOME(state::checkpoint(path, task_));
- }
+ CHECK_SOME(state::checkpoint(path, task));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index aaf8e5c..01abb50 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -84,20 +84,49 @@ inline Try<Nothing> checkpoint(
}
+template <
+ typename T,
+ typename std::enable_if<
+ std::is_convertible<T*, google::protobuf::Message*>::value,
+ int>::type = 0>
+inline Try<Nothing> checkpoint(const std::string& path, T message)
+{
+ // If the `Try` from `downgradeResources` returns an `Error`, we currently
+ // continue to checkpoint the resources in a partially downgraded state.
+ // This implies that an agent with refined reservations cannot be downgraded
+ // to versions before reservation refinement support, which was introduced
+ // in 1.4.0.
+ //
+ // TODO(mpark): Do something smarter with the result once
+ // something like an agent recovery capability is introduced.
+ downgradeResources(&message);
+ return ::protobuf::write(path, message);
+}
+
+
inline Try<Nothing> checkpoint(
const std::string& path,
- const google::protobuf::Message& message)
+ google::protobuf::RepeatedPtrField<Resource> resources)
{
- return ::protobuf::write(path, message);
+ // If the `Try` from `downgradeResources` returns an `Error`, we currently
+ // continue to checkpoint the resources in a partially downgraded state.
+ // This implies that an agent with refined reservations cannot be downgraded
+ // to versions before reservation refinement support, which was introduced
+ // in 1.4.0.
+ //
+ // TODO(mpark): Do something smarter with the result once
+ // something like an agent recovery capability is introduced.
+ downgradeResources(&resources);
+ return ::protobuf::write(path, resources);
}
-template <typename T>
-Try<Nothing> checkpoint(
+inline Try<Nothing> checkpoint(
const std::string& path,
- const google::protobuf::RepeatedPtrField<T>& messages)
+ const Resources& resources)
{
- return ::protobuf::write(path, messages);
+ const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
+ return checkpoint(path, messages);
}
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index bf2c5fc..387c2ff 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -138,7 +138,7 @@ TEST_F(SlaveStateTest, CheckpointProtobufMessage)
TEST_F(SlaveStateTest, CheckpointRepeatedProtobufMessages)
{
// Checkpoint resources.
- const google::protobuf::RepeatedPtrField<Resource> expected =
+ const Resources expected =
Resources::parse("cpus:2;mem:512;cpus(role):4;mem(role):1024").get();
const string file = "resources-file";
@@ -149,6 +149,11 @@ TEST_F(SlaveStateTest, CheckpointRepeatedProtobufMessages)
ASSERT_SOME(actual);
+ // We convert the `actual` back to "post-reservation-refinement"
+ // since `state::checkpoint` always downgrades whatever resources
+ // are downgradable.
+ convertResourceFormat(&actual.get(), POST_RESERVATION_REFINEMENT);
+
EXPECT_SOME_EQ(expected, actual);
}