You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2018/01/03 07:01:45 UTC

[4/4] mesos git commit: Used the protobuf-reflection-based 'downgradeResources' utility.

Used the protobuf-reflection-based 'downgradeResources' utility.

Review: https://reviews.apache.org/r/63977


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9e65674
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9e65674
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9e65674

Branch: refs/heads/master
Commit: e9e656747b4da88498e07b44f6bc5fee2a64dad8
Parents: db8984f
Author: Michael Park <mp...@apache.org>
Authored: Mon Dec 11 16:09:41 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Jan 2 22:42:44 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp              |  43 +++-------
 src/slave/slave.cpp                | 137 +++++++-------------------------
 src/slave/state.hpp                |  41 ++++++++--
 src/tests/slave_recovery_tests.cpp |   7 +-
 4 files changed, 82 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bba70c3..282fdf8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5004,20 +5004,13 @@ void Master::_accept(
                       slave->info));
             }
 
-            // If the agent does not support reservation refinement,
-            // downgrade the task and executor resources to the
-            // "pre-reservation-refinement" format. This cannot fail
-            // since the master rejects attempts to create refined
-            // reservations on non-capable agents.
+            // If the agent does not support reservation refinement, downgrade
+            // the task / executor resources to the "pre-reservation-refinement"
+            // format. This cannot contain any refined reservations since
+            // the master rejects attempts to create refined reservations
+            // on non-capable agents.
             if (!slave->capabilities.reservationRefinement) {
-              TaskInfo& task = *message.mutable_task();
-
-              CHECK_SOME(downgradeResources(task.mutable_resources()));
-
-              if (task.has_executor()) {
-                CHECK_SOME(downgradeResources(
-                    task.mutable_executor()->mutable_resources()));
-              }
+              CHECK_SOME(downgradeResources(&message));
             }
 
             // TODO(bmahler): Consider updating this log message to
@@ -5208,21 +5201,11 @@ void Master::_accept(
 
         // If the agent does not support reservation refinement, downgrade
         // the task and executor resources to the "pre-reservation-refinement"
-        // format. This cannot fail since the master rejects attempts to
-        // create refined reservations on non-capable agents.
+        // format. This cannot contain any refined reservations since
+        // the master rejects attempts to create refined reservations
+        // on non-capable agents.
         if (!slave->capabilities.reservationRefinement) {
-          CHECK_SOME(downgradeResources(
-              message.mutable_executor()->mutable_resources()));
-
-          foreach (
-              TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) {
-            CHECK_SOME(downgradeResources(task.mutable_resources()));
-
-            if (task.has_executor()) {
-              CHECK_SOME(downgradeResources(
-                  task.mutable_executor()->mutable_resources()));
-            }
-          }
+          CHECK_SOME(downgradeResources(&message));
         }
 
         LOG(INFO) << "Launching task group " << stringify(taskIds)
@@ -7148,9 +7131,9 @@ void Master::___reregisterSlave(
       //
       // TODO(neilc): It would probably be better to prevent the agent
       // from re-registering in this scenario.
-      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      Try<Nothing> result = downgradeResources(&message);
       if (result.isError()) {
-        LOG(WARNING) << "Not sending updated checkpointed resouces "
+        LOG(WARNING) << "Not sending updated checkpointed resources "
                      << slave->checkpointedResources
                      << " with refined reservations, since agent " << *slave
                      << " is not RESERVATION_REFINEMENT-capable.";
@@ -10653,7 +10636,7 @@ void Master::_apply(
       //
       // TODO(neilc): It would probably be better to prevent the agent
       // from re-registering in this scenario.
-      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      Try<Nothing> result = downgradeResources(&message);
       if (result.isError()) {
         LOG(WARNING) << "Not sending updated checkpointed resources "
                      << slave->checkpointedResources

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ef2394..cfb675d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1294,17 +1294,7 @@ void Slave::registered(
 
       VLOG(1) << "Checkpointing SlaveInfo to '" << path << "'";
 
-      {
-        // The `SlaveInfo.resources` does not include dynamic reservations,
-        // which means it cannot contain reservation refinements, so
-        // `downgradeResources` should always succeed.
-        SlaveInfo info_ = info;
-
-        Try<Nothing> result = downgradeResources(info_.mutable_resources());
-        CHECK_SOME(result);
-
-        CHECK_SOME(state::checkpoint(path, info_));
-      }
+      CHECK_SOME(state::checkpoint(path, info));
 
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
@@ -1531,43 +1521,31 @@ void Slave::doReliableRegistration(Duration maxBackoff)
   // See MESOS-5330.
   link(master.get());
 
-  SlaveInfo slaveInfo = info;
-
-  // The `SlaveInfo.resources` does not include dynamic reservations,
-  // which means it cannot contain reservation refinements, so
-  // `downgradeResources` should always succeed.
-  Try<Nothing> result = downgradeResources(slaveInfo.mutable_resources());
-  CHECK_SOME(result);
-
-  RepeatedPtrField<Resource> checkpointedResources_ = checkpointedResources;
-
-  // If the checkpointed resources don't have reservation refinements,
-  // send them to the master in "pre-reservation-refinement" format
-  // for backward compatibility with old masters. If downgrading is
-  // not possible without losing information, send the resources in
-  // the "post-reservation-refinement" format. We ignore the return
-  // value of `downgradeResources` because for now, we send the result
-  // either way.
-  //
-  // TODO(mpark): Do something smarter with the result once something
-  // like a master capability is introduced.
-  downgradeResources(&checkpointedResources_);
-
-  if (!slaveInfo.has_id()) {
+  if (!info.has_id()) {
     // Registering for the first time.
     RegisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
-    message.mutable_slave()->CopyFrom(slaveInfo);
+    message.mutable_slave()->CopyFrom(info);
 
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
     // Include checkpointed resources.
-    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
 
     message.mutable_resource_version_uuid()->set_value(
         resourceVersion.toBytes());
 
+    // If the `Try` from `downgradeResources` returns an `Error`, we currently
+    // continue to send the resources to the master in a partially downgraded
+    // state. This implies that an agent with refined reservations cannot work
+    // with versions of master before reservation refinement support, which was
+    // introduced in 1.4.0.
+    //
+    // TODO(mpark): Do something smarter with the result once something
+    //              like a master capability is introduced.
+    downgradeResources(&message);
+
     send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
@@ -1578,12 +1556,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
         capabilities.toRepeatedPtrField());
 
     // Include checkpointed resources.
-    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
 
     message.mutable_resource_version_uuid()->set_value(
         resourceVersion.toBytes());
 
-    message.mutable_slave()->CopyFrom(slaveInfo);
+    message.mutable_slave()->CopyFrom(info);
 
     foreachvalue (Framework* framework, frameworks) {
       message.add_frameworks()->CopyFrom(framework->info);
@@ -1671,29 +1649,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
       }
     }
 
-    // If the resources don't have reservation refinements, send them
-    // to the master in "pre-reservation-refinement" format for backward
-    // compatibility with old masters. If downgrading is not possible
-    // without losing information, send the resources in the
-    // "post-reservation-refinement" format. We ignore the return value of
-    // `downgradeResources` because for now, we send the result either way.
+    // If the `Try` from `downgradeResources` returns an `Error`, we currently
+    // continue to send the resources to the master in a partially downgraded
+    // state. This implies that an agent with refined reservations cannot work
+    // with versions of master before reservation refinement support, which was
+    // introduced in 1.4.0.
     //
     // TODO(mpark): Do something smarter with the result once something
     // like a master capability is introduced.
-    foreach (Task& task, *message.mutable_tasks()) {
-      downgradeResources(task.mutable_resources());
-    }
-
-    foreach (ExecutorInfo& executor, *message.mutable_executor_infos()) {
-      downgradeResources(executor.mutable_resources());
-    }
-
-    foreach (Archive::Framework& completedFramework,
-             *message.mutable_completed_frameworks()) {
-      foreach (Task& task, *completedFramework.mutable_tasks()) {
-        downgradeResources(task.mutable_resources());
-      }
-    }
+    downgradeResources(&message);
 
     CHECK_SOME(master);
     send(master.get(), message);
@@ -3683,27 +3647,10 @@ void Slave::checkpointResources(
   // we avoid a case of inconsistency between the master and the agent if
   // the agent restarts during handling of CheckpointResourcesMessage.
 
-  {
-    // If the checkpointed resources don't have reservation refinements,
-    // checkpoint them on the agent in "pre-reservation-refinement" format
-    // for backward compatibility with old agents. If downgrading is
-    // not possible without losing information, checkpoint the resources in
-    // the "post-reservation-refinement" format. We ignore the return
-    // value of `downgradeResources` because for now, we checkpoint the result
-    // either way.
-    //
-    // TODO(mpark): Do something smarter with the result once something
-    // like agent capability requirements is introduced.
-    RepeatedPtrField<Resource> newCheckpointedResources_ =
-      newCheckpointedResources;
-
-    downgradeResources(&newCheckpointedResources_);
-
-    CHECK_SOME(state::checkpoint(
-        paths::getResourcesTargetPath(metaDir),
-        newCheckpointedResources_))
-      << "Failed to checkpoint resources target " << newCheckpointedResources_;
-  }
+  CHECK_SOME(state::checkpoint(
+      paths::getResourcesTargetPath(metaDir),
+      newCheckpointedResources))
+    << "Failed to checkpoint resources target " << newCheckpointedResources;
 
   Try<Nothing> syncResult = syncCheckpointedResources(
       newCheckpointedResources);
@@ -9006,21 +8953,7 @@ void Executor::checkpointExecutor()
 
   VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
 
-  {
-    // If the checkpointed resources don't have reservation refinements,
-    // checkpoint them on the agent in "pre-reservation-refinement" format
-    // for backward compatibility with old agents. If downgrading is
-    // not possible without losing information, checkpoint the resources in
-    // the "post-reservation-refinement" format. We ignore the return
-    // value of `downgradeResources` because for now, we checkpoint the result
-    // either way.
-    //
-    // TODO(mpark): Do something smarter with the result once something
-    // like agent capability requirements is introduced.
-    ExecutorInfo info_ = info;
-    downgradeResources(info_.mutable_resources());
-    CHECK_SOME(state::checkpoint(path, info_));
-  }
+  CHECK_SOME(state::checkpoint(path, info));
 
   // Create the meta executor directory.
   // NOTE: This creates the 'latest' symlink in the meta directory.
@@ -9049,21 +8982,7 @@ void Executor::checkpointTask(const Task& task)
 
   VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
 
-  {
-    // If the checkpointed resources don't have reservation refinements,
-    // checkpoint them on the agent in "pre-reservation-refinement" format
-    // for backward compatibility with old agents. If downgrading is
-    // not possible without losing information, checkpoint the resources in
-    // the "post-reservation-refinement" format. We ignore the return
-    // value of `downgradeResources` because for now, we checkpoint the result
-    // either way.
-    //
-    // TODO(mpark): Do something smarter with the result once something
-    // like agent capability requirements is introduced.
-    Task task_ = task;
-    downgradeResources(task_.mutable_resources());
-    CHECK_SOME(state::checkpoint(path, task_));
-  }
+  CHECK_SOME(state::checkpoint(path, task));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index aaf8e5c..01abb50 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -84,20 +84,49 @@ inline Try<Nothing> checkpoint(
 }
 
 
+template <
+    typename T,
+    typename std::enable_if<
+        std::is_convertible<T*, google::protobuf::Message*>::value,
+        int>::type = 0>
+inline Try<Nothing> checkpoint(const std::string& path, T message)
+{
+  // If the `Try` from `downgradeResources` returns an `Error`, we currently
+  // continue to checkpoint the resources in a partially downgraded state.
+  // This implies that an agent with refined reservations cannot be downgraded
+  // to versions before reservation refinement support, which was introduced
+  // in 1.4.0.
+  //
+  // TODO(mpark): Do something smarter with the result once
+  // something like an agent recovery capability is introduced.
+  downgradeResources(&message);
+  return ::protobuf::write(path, message);
+}
+
+
 inline Try<Nothing> checkpoint(
     const std::string& path,
-    const google::protobuf::Message& message)
+    google::protobuf::RepeatedPtrField<Resource> resources)
 {
-  return ::protobuf::write(path, message);
+  // If the `Try` from `downgradeResources` returns an `Error`, we currently
+  // continue to checkpoint the resources in a partially downgraded state.
+  // This implies that an agent with refined reservations cannot be downgraded
+  // to versions before reservation refinement support, which was introduced
+  // in 1.4.0.
+  //
+  // TODO(mpark): Do something smarter with the result once
+  // something like an agent recovery capability is introduced.
+  downgradeResources(&resources);
+  return ::protobuf::write(path, resources);
 }
 
 
-template <typename T>
-Try<Nothing> checkpoint(
+inline Try<Nothing> checkpoint(
     const std::string& path,
-    const google::protobuf::RepeatedPtrField<T>& messages)
+    const Resources& resources)
 {
-  return ::protobuf::write(path, messages);
+  const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
+  return checkpoint(path, messages);
 }
 
 }  // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9e65674/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index bf2c5fc..387c2ff 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -138,7 +138,7 @@ TEST_F(SlaveStateTest, CheckpointProtobufMessage)
 TEST_F(SlaveStateTest, CheckpointRepeatedProtobufMessages)
 {
   // Checkpoint resources.
-  const google::protobuf::RepeatedPtrField<Resource> expected =
+  const Resources expected =
     Resources::parse("cpus:2;mem:512;cpus(role):4;mem(role):1024").get();
 
   const string file = "resources-file";
@@ -149,6 +149,11 @@ TEST_F(SlaveStateTest, CheckpointRepeatedProtobufMessages)
 
   ASSERT_SOME(actual);
 
+  // We convert the `actual` back to "post-reservation-refinement"
+  // since `state::checkpoint` always downgrades whatever resources
+  // are downgradable.
+  convertResourceFormat(&actual.get(), POST_RESERVATION_REFINEMENT);
+
   EXPECT_SOME_EQ(expected, actual);
 }