You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2016/01/19 00:47:26 UTC

mesos git commit: Replaced mutex in HTTP server for fetcher cache tests with latch.

Repository: mesos
Updated Branches:
  refs/heads/master 4e98abe58 -> 5c1e0170f


Replaced mutex in HTTP server for fetcher cache tests with latch.

Also inlined the function that awaits fetch contention.

This mutex was prone to causing races at task startup by firmly
blocking an internal libprocess thread. The latch avoids this.

Failing to launch a task due to such a race did not get flagged
by directly related test failures, because the AWAIT catching this
situation was ineffective, having been placed inside a call from the
test. Only the subsequent wait for task completion triggered a test
failure then. By then it was obscured what exactly had happened.

Review: https://reviews.apache.org/r/42149/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c1e0170
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c1e0170
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c1e0170

Branch: refs/heads/master
Commit: 5c1e0170fb13029dbd257cbef556f76bec1f1493
Parents: 4e98abe
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Tue Jan 19 00:43:05 2016 +0100
Committer: Till Toenshoff <to...@me.com>
Committed: Tue Jan 19 00:45:04 2016 +0100

----------------------------------------------------------------------
 src/tests/fetcher_cache_tests.cpp | 111 +++++++++++++++++----------------
 1 file changed, 57 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5c1e0170/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index 1fb1e21..2747b72 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -17,7 +17,6 @@
 #include <unistd.h>
 
 #include <list>
-#include <mutex>
 #include <string>
 #include <vector>
 
@@ -31,6 +30,7 @@
 #include <process/collect.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/latch.hpp>
 #include <process/message.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
@@ -71,6 +71,7 @@ using mesos::internal::slave::FetcherProcess;
 
 using process::Future;
 using process::HttpEvent;
+using process::Latch;
 using process::Owned;
 using process::PID;
 using process::Process;
@@ -143,8 +144,9 @@ protected:
 
   Try<vector<Task>> launchTasks(const vector<CommandInfo>& commandInfos);
 
-  // Waits until FetcherProcess::run() has been called for all tasks.
-  void awaitFetchContention();
+  // Promises whose futures indicate that FetcherProcess::_fetch() has been
+  // called for a task with a given index.
+  vector<Owned<Promise<Nothing>>> fetchContentionWaypoints;
 
   string assetsDirectory;
   string commandPath;
@@ -164,10 +166,6 @@ private:
 
   FrameworkID frameworkId;
 
-  // Promises whose futures indicate that FetcherProcess::_fetch() has been
-  // called for a task with a given index.
-  vector<Owned<Promise<Nothing>>> fetchContentionWaypoints;
-
   // If this test did not succeed as indicated by the above variable,
   // the contents of these sandboxes will be dumped during tear down.
   vector<Path> sandboxes;
@@ -590,16 +588,6 @@ Try<vector<FetcherCacheTest::Task>> FetcherCacheTest::launchTasks(
 }
 
 
-// Ensure that FetcherProcess::_fetch() has been called for each task,
-// which means that all tasks are competing for downloading the same URIs.
-void FetcherCacheTest::awaitFetchContention()
-{
-  foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
-    AWAIT(waypoint->future());
-  }
-}
-
-
 // Tests fetching from the local asset directory without cache. This
 // gives us a baseline for the following tests and lets us debug our
 // test infrastructure without extra complications.
@@ -807,13 +795,22 @@ public:
   class HttpServer : public Process<HttpServer>
   {
   public:
-    HttpServer(FetcherCacheHttpTest* test)
+  public:
+    HttpServer(const string& _commandPath, const string& _archivePath)
       : countRequests(0),
         countCommandRequests(0),
-        countArchiveRequests(0)
+        countArchiveRequests(0),
+        commandPath(_commandPath),
+        archivePath(_archivePath)
+    {
+      CHECK(!_commandPath.empty());
+      CHECK(!_archivePath.empty());
+    }
+
+    virtual void initialize()
     {
-      provide(COMMAND_NAME, test->commandPath);
-      provide(ARCHIVE_NAME, test->archivePath);
+      provide(COMMAND_NAME, commandPath);
+      provide(ARCHIVE_NAME, archivePath);
     }
 
     string url()
@@ -821,40 +818,40 @@ public:
       return "http://" + stringify(self().address) + "/" + self().id + "/";
     }
 
-    // Stalls the execution of HTTP requests inside visit().
+    // Stalls the execution of future HTTP requests inside visit().
     void pause()
     {
-      mutex.lock();
+      // If there is no latch or if the existing latch has already been
+      // triggered, create a new latch.
+      if (latch.get() == nullptr || latch->await(Duration::min())) {
+        latch.reset(new Latch());
+      }
     }
 
     void resume()
     {
-      mutex.unlock();
+      if (latch.get() != nullptr) {
+        latch->trigger();
+      }
     }
 
     virtual void visit(const HttpEvent& event)
     {
-      // TODO(bernd-mesos): Don't use locks here because we'll
-      // actually block libprocess threads which could cause a
-      // deadlock if we have a test with too many requests that we
-      // don't have enough threads to run other actors! Instead,
-      // consider asynchronously deferring the actual execution of
-      // this function via a Queue. This is currently non-trivial
-      // because we can't copy an HttpEvent so we're _forced_ to block
-      // the thread synchronously.
-      synchronized (mutex) {
-        countRequests++;
-
-        if (strings::contains(event.request->url.path, COMMAND_NAME)) {
-          countCommandRequests++;
-        }
-
-        if (strings::contains(event.request->url.path, ARCHIVE_NAME)) {
-          countArchiveRequests++;
-        }
-
-        ProcessBase::visit(event);
+      if (latch.get() != nullptr) {
+        latch->await();
+      }
+
+      countRequests++;
+
+      if (strings::contains(event.request->url.path, COMMAND_NAME)) {
+        countCommandRequests++;
+      }
+
+      if (strings::contains(event.request->url.path, ARCHIVE_NAME)) {
+        countArchiveRequests++;
       }
+
+      ProcessBase::visit(event);
     }
 
     void resetCounts()
@@ -869,14 +866,16 @@ public:
     size_t countArchiveRequests;
 
   private:
-    std::mutex mutex;
+    const string commandPath;
+    const string archivePath;
+    Owned<Latch> latch;
   };
 
   virtual void SetUp()
   {
     FetcherCacheTest::SetUp();
 
-    httpServer = new HttpServer(this);
+    httpServer = new HttpServer(commandPath, archivePath);
     spawn(httpServer);
   }
 
@@ -974,10 +973,12 @@ TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent)
 
   CHECK_EQ(countTasks, tasks.get().size());
 
-  // Given pausing the HTTP server, this proves that fetch contention
-  // has happened. All tasks have passed the point where it occurs,
-  // but they are not running yet.
-  awaitFetchContention();
+  // Having paused the HTTP server, ensure that FetcherProcess::_fetch()
+  // has been called for each task, which means that all tasks are competing
+  // for downloading the same URIs.
+  foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
+    AWAIT(waypoint->future());
+  }
 
   // Now let the tasks run.
   httpServer->resume();
@@ -1081,10 +1082,12 @@ TEST_F(FetcherCacheHttpTest, HttpMixed)
 
   CHECK_EQ(3u, tasks.get().size());
 
-  // Given pausing the HTTP server, this proves that fetch contention
-  // has happened. All tasks have passed the point where it occurs,
-  // but they are not running yet.
-  awaitFetchContention();
+  // Having paused the HTTP server, ensure that FetcherProcess::_fetch()
+  // has been called for each task, which means that all tasks are competing
+  // for downloading the same URIs.
+  foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
+    AWAIT(waypoint->future());
+  }
 
   // Now let the tasks run.
   httpServer->resume();