You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/12/08 00:03:46 UTC

[2/3] mesos git commit: Eliminated some copying of tasks / executors in agent re-registration.

Eliminated some copying of tasks / executors in agent re-registration.

Review: https://reviews.apache.org/r/64428


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4fe3bdb8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4fe3bdb8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4fe3bdb8

Branch: refs/heads/master
Commit: 4fe3bdb8ed5e8d4ddc894ff7fd5cbcd3183526be
Parents: 108eb06
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Dec 7 12:03:58 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Dec 7 16:03:23 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 79 +++++++++++++++++++++++++++-------------------
 src/master/master.hpp |  9 +++---
 2 files changed, 51 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4fe3bdb8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1d192db..584398c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -19,6 +19,7 @@
 #include <algorithm>
 #include <cctype>
 #include <fstream>
+#include <functional>
 #include <iomanip>
 #include <list>
 #include <memory>
@@ -98,6 +99,7 @@
 using google::protobuf::RepeatedPtrField;
 
 using std::list;
+using std::reference_wrapper;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -6318,7 +6320,7 @@ void Master::__registerSlave(
 
   ++metrics->slave_registrations;
 
-  addSlave(slave);
+  addSlave(slave, {});
 
   Duration pingTimeout =
     flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
@@ -6721,16 +6723,6 @@ void Master::__reregisterSlave(
     }
   };
 
-  vector<Resource> checkpointedResources =
-    google::protobuf::convert(reregisterSlaveMessage.checkpointed_resources());
-  vector<ExecutorInfo> executorInfos =
-    google::protobuf::convert(reregisterSlaveMessage.executor_infos());
-  vector<Task> tasks =
-    google::protobuf::convert(reregisterSlaveMessage.tasks());
-  const vector<FrameworkInfo> frameworks =
-    google::protobuf::convert(reregisterSlaveMessage.frameworks());
-  vector<Archive::Framework> completedFrameworks =
-    google::protobuf::convert(reregisterSlaveMessage.completed_frameworks());
   vector<SlaveInfo::Capability> agentCapabilities =
     google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
 
@@ -6740,43 +6732,47 @@ void Master::__reregisterSlave(
 
   // If the agent is not multi-role capable, inject allocation info.
   if (!slaveCapabilities.multiRole) {
-    hashmap<FrameworkID, FrameworkInfo> frameworks_;
-    foreach (const FrameworkInfo& framework, frameworks) {
-      frameworks_[framework.id()] = framework;
+    hashmap<FrameworkID, reference_wrapper<const FrameworkInfo>> frameworks;
+
+    foreach (const FrameworkInfo& framework,
+             reregisterSlaveMessage.frameworks()) {
+      frameworks.emplace(framework.id(), framework);
     }
 
-    foreach (Task& task, tasks) {
-      CHECK(frameworks_.contains(task.framework_id()));
+    foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
+      CHECK(frameworks.contains(task.framework_id()));
 
       injectAllocationInfo(
           task.mutable_resources(),
-          frameworks_.at(task.framework_id()));
+          frameworks.at(task.framework_id()));
     }
 
-    foreach (ExecutorInfo& executor, executorInfos) {
-      CHECK(frameworks_.contains(executor.framework_id()));
+    foreach (ExecutorInfo& executor,
+             *reregisterSlaveMessage.mutable_executor_infos()) {
+      CHECK(frameworks.contains(executor.framework_id()));
 
       injectAllocationInfo(
           executor.mutable_resources(),
-          frameworks_.at(executor.framework_id()));
+          frameworks.at(executor.framework_id()));
     }
   }
 
   // Currently, The agent always downgrades the resources such that
   // a 1.4.0 agent can speak to a pre-1.4.0 master. We therefore
   // unconditionally upgrade the resources back here.
-  foreach (Task& task, tasks) {
+  foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
     convertResourceFormat(
         task.mutable_resources(), POST_RESERVATION_REFINEMENT);
   }
 
-  foreach (ExecutorInfo& executor, executorInfos) {
+  foreach (ExecutorInfo& executor,
+           *reregisterSlaveMessage.mutable_executor_infos()) {
     convertResourceFormat(
         executor.mutable_resources(), POST_RESERVATION_REFINEMENT);
   }
 
   foreach (Archive::Framework& completedFramework,
-           completedFrameworks) {
+           *reregisterSlaveMessage.mutable_completed_frameworks()) {
     foreach (Task& task, *completedFramework.mutable_tasks()) {
       convertResourceFormat(
           task.mutable_resources(), POST_RESERVATION_REFINEMENT);
@@ -6791,7 +6787,8 @@ void Master::__reregisterSlave(
   // re-registering agent that are partition-aware.
   hashset<FrameworkID> partitionAwareFrameworks;
 
-  foreach (const FrameworkInfo& framework, frameworks) {
+  foreach (const FrameworkInfo& framework,
+           reregisterSlaveMessage.frameworks()) {
     if (protobuf::frameworkHasCapability(
             framework, FrameworkInfo::Capability::PARTITION_AWARE)) {
       partitionAwareFrameworks.insert(framework.id());
@@ -6802,7 +6799,7 @@ void Master::__reregisterSlave(
   // master (those tasks were previously marked "unreachable", so they
   // should be removed from that collection).
   vector<Task> recoveredTasks;
-  foreach (Task& task, tasks) {
+  foreach (Task& task, *reregisterSlaveMessage.mutable_tasks()) {
     const FrameworkID& frameworkId = task.framework_id();
 
     // Don't re-add tasks whose framework has been shutdown at the
@@ -6819,6 +6816,11 @@ void Master::__reregisterSlave(
     recoveredTasks.push_back(std::move(task));
   }
 
+  vector<Resource> checkpointedResources = google::protobuf::convert(
+      std::move(*reregisterSlaveMessage.mutable_checkpointed_resources()));
+  vector<ExecutorInfo> executorInfos = google::protobuf::convert(
+      std::move(*reregisterSlaveMessage.mutable_executor_infos()));
+
   Slave* slave = new Slave(
       this,
       slaveInfo,
@@ -6840,7 +6842,10 @@ void Master::__reregisterSlave(
   slaves.removed.erase(slave->id);
   slaves.unreachable.erase(slave->id);
 
-  addSlave(slave, completedFrameworks);
+  vector<Archive::Framework> completedFrameworks = google::protobuf::convert(
+      std::move(*reregisterSlaveMessage.mutable_completed_frameworks()));
+
+  addSlave(slave, std::move(completedFrameworks));
 
   Duration pingTimeout =
     flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
@@ -6865,7 +6870,8 @@ void Master::__reregisterSlave(
   // lost when the master fails over. Also, we only store a limited
   // number of completed frameworks. A proper fix likely involves
   // storing framework information in the registry (MESOS-1719).
-  foreach (const FrameworkInfo& framework, frameworks) {
+  foreach (const FrameworkInfo& framework,
+           reregisterSlaveMessage.frameworks()) {
     if (isCompletedFramework(framework.id())) {
       LOG(INFO) << "Shutting down framework " << framework.id()
                 << " at re-registered agent " << *slave
@@ -6877,6 +6883,12 @@ void Master::__reregisterSlave(
     }
   }
 
+  // TODO(bmahler): Consider moving this in to `updateSlaveFrameworks`,
+  // would be helpful when there are a large total number of frameworks
+  // in the cluster.
+  const vector<FrameworkInfo> frameworks = google::protobuf::convert(
+      std::move(*reregisterSlaveMessage.mutable_frameworks()));
+
   updateSlaveFrameworks(slave, frameworks);
 
   slaves.reregistering.erase(slaveInfo.id());
@@ -7066,6 +7078,9 @@ void Master::updateSlaveFrameworks(
     Framework* framework = getFramework(frameworkInfo.id());
 
     if (framework != nullptr) {
+      // TODO(bmahler): Copying the framework info here can be
+      // expensive, consider only sending this message when
+      // there has been a change vs what the agent reported.
       UpdateFrameworkMessage message;
       message.mutable_framework_id()->CopyFrom(framework->id());
       message.mutable_framework_info()->CopyFrom(framework->info);
@@ -9433,7 +9448,7 @@ void Master::removeFramework(Framework* framework)
     CHECK(!slaves.registered.contains(task->slave_id()));
 
     // Move task from unreachable map to completed map.
-    framework->addCompletedTask(*task);
+    framework->addCompletedTask(std::move(*task));
     framework->unreachableTasks.erase(taskId);
   }
 
@@ -9554,7 +9569,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 
 void Master::addSlave(
     Slave* slave,
-    const vector<Archive::Framework>& completedFrameworks)
+    vector<Archive::Framework>&& completedFrameworks)
 {
   CHECK_NOTNULL(slave);
   CHECK(!slaves.registered.contains(slave->id));
@@ -9628,16 +9643,16 @@ void Master::addSlave(
   //
   // TODO(vinod): Reconcile the notion of a completed framework across
   // the master and slave.
-  foreach (const Archive::Framework& completedFramework, completedFrameworks) {
+  foreach (Archive::Framework& completedFramework, completedFrameworks) {
     Framework* framework = getFramework(
         completedFramework.framework_info().id());
 
-    foreach (const Task& task, completedFramework.tasks()) {
+    foreach (Task& task, *completedFramework.mutable_tasks()) {
       if (framework != nullptr) {
         VLOG(2) << "Re-adding completed task " << task.task_id()
                 << " of framework " << *framework
                 << " that ran on agent " << *slave;
-        framework->addCompletedTask(task);
+        framework->addCompletedTask(std::move(task));
       } else {
         // The framework might not be re-registered yet.
         //

http://git-wip-us.apache.org/repos/asf/mesos/blob/4fe3bdb8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2658312..1f5daae 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -710,8 +710,7 @@ protected:
   // Add a slave.
   void addSlave(
       Slave* slave,
-      const std::vector<Archive::Framework>& completedFrameworks =
-        std::vector<Archive::Framework>());
+      std::vector<Archive::Framework>&& completedFrameworks);
 
   void _markUnreachable(
       Slave* slave,
@@ -2308,14 +2307,14 @@ struct Framework
     }
   }
 
-  void addCompletedTask(const Task& task)
+  void addCompletedTask(Task&& task)
   {
     // TODO(neilc): We currently allow frameworks to reuse the task
     // IDs of completed tasks (although this is discouraged). This
     // means that there might be multiple completed tasks with the
     // same task ID. We should consider rejecting attempts to reuse
     // task IDs (MESOS-6779).
-    completedTasks.push_back(process::Owned<Task>(new Task(task)));
+    completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
   }
 
   void addUnreachableTask(const Task& task)
@@ -2343,7 +2342,7 @@ struct Framework
         << task->state();
       addUnreachableTask(*task);
     } else {
-      addCompletedTask(*task);
+      addCompletedTask(Task(*task));
     }
 
     tasks.erase(task->task_id());