You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/23 22:23:51 UTC

[08/13] mesos git commit: Unschedule directories for GC in parallel in the agent.

Unschedule directories for GC in parallel in the agent.

Previously, when unscheduling directories for garbage collection,
a dispatch back to the agent was required between each gc result.

Review: https://reviews.apache.org/r/61649


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7c62b84
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7c62b84
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7c62b84

Branch: refs/heads/master
Commit: d7c62b8413f9018371e6e1dc30a292a54973d19d
Parents: 12026b8
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 14:54:22 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 43 ++++++++++++++++++------------------------
 src/slave/slave.hpp       |  4 +---
 src/tests/mock_slave.cpp  |  4 ++--
 src/tests/mock_slave.hpp  |  4 ++--
 src/tests/slave_tests.cpp | 23 +++++++++++-----------
 5 files changed, 35 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ee8187..b5564bb 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1596,14 +1596,6 @@ void Slave::doReliableRegistration(Duration maxBackoff)
 }
 
 
-// Helper to unschedule the path.
-// TODO(vinod): Can we avoid this helper?
-Future<bool> Slave::unschedule(const string& path)
-{
-  return gc->unschedule(path);
-}
-
-
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
@@ -1742,7 +1734,7 @@ void Slave::run(
     return;
   }
 
-  Future<bool> unschedule = true;
+  list<Future<bool>> unschedules;
 
   // If we are about to create a new framework, unschedule the work
   // and meta directories from getting gc'ed.
@@ -1753,13 +1745,13 @@ void Slave::run(
         flags.work_dir, info.id(), frameworkId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     // Unschedule framework meta directory.
     path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     Option<UPID> frameworkPid = None();
@@ -1836,31 +1828,31 @@ void Slave::run(
         flags.work_dir, info.id(), frameworkId, executorId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     // Unschedule executor meta directory.
     path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
   }
 
   // Run the task after the unschedules are done.
-  unschedule.onAny(defer(
-      self(),
-      &Self::_run,
-      lambda::_1,
-      frameworkInfo,
-      executorInfo,
-      task,
-      taskGroup));
+  collect(unschedules)
+    .onAny(defer(self(),
+                 &Self::_run,
+                 lambda::_1,
+                 frameworkInfo,
+                 executorInfo,
+                 task,
+                 taskGroup));
 }
 
 
 void Slave::_run(
-    const Future<bool>& future,
+    const Future<list<bool>>& unschedules,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -1933,11 +1925,12 @@ void Slave::_run(
     return;
   }
 
-  CHECK(!future.isDiscarded());
+  CHECK(!unschedules.isDiscarded());
 
-  if (!future.isReady()) {
+  if (!unschedules.isReady()) {
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
-               << (future.isFailed() ? future.failure() : "future discarded");
+               << (unschedules.isFailed() ?
+                   unschedules.failure() : "future discarded");
 
     // We report TASK_DROPPED to the framework because the task was
     // never launched. For non-partition-aware frameworks, we report

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 28fa671..80fb1ab 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -158,7 +158,7 @@ public:
 
   // Made 'virtual' for Slave mocking.
   virtual void _run(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -171,8 +171,6 @@ public:
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroupInfo);
 
-  process::Future<bool> unschedule(const std::string& path);
-
   // Made 'virtual' for Slave mocking.
   virtual void killTask(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index c435ec7..db24f9e 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -178,14 +178,14 @@ void MockSlave::unmocked_runTask(
 
 
 void MockSlave::unmocked__run(
-    const Future<bool>& future,
+    const Future<list<bool>>& unschedules,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
     const Option<TaskGroupInfo>& taskGroup)
 {
   slave::Slave::_run(
-      future, frameworkInfo, executorInfo, taskInfo, taskGroup);
+      unschedules, frameworkInfo, executorInfo, taskInfo, taskGroup);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 767ed3d..57189ce 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -132,14 +132,14 @@ public:
       const TaskInfo& task);
 
   MOCK_METHOD5(_run, void(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup));
 
   void unmocked__run(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 57679bd..1bdadce 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -122,6 +122,7 @@ using process::http::Unauthorized;
 
 using process::http::authentication::Principal;
 
+using std::list;
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -4040,7 +4041,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
-  Future<bool> future;
+  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo;
   Option<TaskGroupInfo> taskGroup;
@@ -4051,7 +4052,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Future<Nothing> _run;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&future),
+                    SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo),
                     SaveArg<3>(&task_),
@@ -4080,7 +4081,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
-      future, frameworkInfo, executorInfo, task_, taskGroup);
+      unschedules, frameworkInfo, executorInfo, task_, taskGroup);
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
@@ -4154,7 +4155,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
-  Future<bool> future1, future2;
+  Future<list<bool>> unschedules1, unschedules2;
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4163,13 +4164,13 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Future<Nothing> _run1, _run2;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
-                    SaveArg<0>(&future1),
+                    SaveArg<0>(&unschedules1),
                     SaveArg<1>(&frameworkInfo1),
                     SaveArg<2>(&executorInfo1),
                     SaveArg<3>(&task_1),
                     SaveArg<4>(&taskGroup1)))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
-                    SaveArg<0>(&future2),
+                    SaveArg<0>(&unschedules2),
                     SaveArg<1>(&frameworkInfo2),
                     SaveArg<2>(&executorInfo2),
                     SaveArg<3>(&task_2),
@@ -4207,10 +4208,10 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
 
   // The `__run` continuations should have no effect.
   slave.unmocked__run(
-      future1, frameworkInfo1, executorInfo1, task_1, taskGroup1);
+      unschedules1, frameworkInfo1, executorInfo1, task_1, taskGroup1);
 
   slave.unmocked__run(
-      future2, frameworkInfo2, executorInfo2, task_2, taskGroup2);
+      unschedules2, frameworkInfo2, executorInfo2, task_2, taskGroup2);
 
   Clock::settle();
 
@@ -7036,7 +7037,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTaskGroup));
 
   // Saved arguments from `Slave::_run()`.
-  Future<bool> future;
+  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo_;
   Option<TaskGroupInfo> taskGroup_;
@@ -7048,7 +7049,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Future<Nothing> _run;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&future),
+                    SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo_),
                     SaveArg<3>(&task_),
@@ -7120,7 +7121,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
-      future, frameworkInfo, executorInfo_, task_, taskGroup_);
+      unschedules, frameworkInfo, executorInfo_, task_, taskGroup_);
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);