You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/21 17:04:31 UTC

[1/4] mesos git commit: Fix ProcTest.MultipleThreads hanging.

Repository: mesos
Updated Branches:
  refs/heads/master 993dc2dc5 -> da7b4ac6c


Fix ProcTest.MultipleThreads hanging.

Replace 'sleep' with 'pthread_testcancel'.

Review: https://reviews.apache.org/r/28137


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c6d4311
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c6d4311
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c6d4311

Branch: refs/heads/master
Commit: 3c6d43117dd832a992343cfe46893b56517d8ffa
Parents: 993dc2d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:02:37 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:02:38 2014 -0800

----------------------------------------------------------------------
 .../libprocess/3rdparty/stout/tests/proc_tests.cpp    | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3c6d4311/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
index 56a63e8..d3c9aed 100644
--- a/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
+++ b/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
@@ -73,11 +73,17 @@ TEST(ProcTest, SingleThread)
 }
 
 
-void* threadFunction(void*)
+void* cancelFunction(void*)
 {
   // Newly created threads have PTHREAD_CANCEL_ENABLE and
-  // PTHREAD_CANCEL_DEFERRED so they can be cancelled from the main thread.
-  while (true) { sleep(1); }
+  // PTHREAD_CANCEL_DEFERRED so they can be cancelled from the main
+  // thread.
+  while (true) {
+    // Use pthread_testcancel() as opposed to sleep() because we've
+    // seen sleep() hang on certain linux machines even though sleep
+    // should be a cancellation point.
+    pthread_testcancel();
+  }
 
   return NULL;
 }
@@ -93,7 +99,7 @@ TEST(ProcTest, MultipleThreads)
   // Create additional threads.
   for (size_t i = 0; i < numThreads; i++)
   {
-    EXPECT_EQ(0, pthread_create(&pthreads[i], NULL, threadFunction, NULL));
+    EXPECT_EQ(0, pthread_create(&pthreads[i], NULL, cancelFunction, NULL));
   }
 
   // Check we have the expected number of threads.


[2/4] mesos git commit: Fix a race between SocketManager link() and send().

Posted by be...@apache.org.
Fix a race between SocketManager link() and send().

Review: https://reviews.apache.org/r/27967


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b6e4010
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b6e4010
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b6e4010

Branch: refs/heads/master
Commit: 4b6e4010a3d6461e249b11320e1d39d373da7c94
Parents: 3c6d431
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:03:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:03:01 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4b6e4010/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 00cd89f..cf75b51 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1511,6 +1511,10 @@ void ignore_read_data(
 }
 
 
+// Forward declaration.
+void send(Encoder* encoder, Socket* socket);
+
+
 void link_connect(const Future<Nothing>& future, Socket* socket)
 {
   if (future.isDiscarded() || future.isFailed()) {
@@ -1532,8 +1536,26 @@ void link_connect(const Future<Nothing>& future, Socket* socket)
         socket,
         data,
         size));
+
+  // In order to avoid a race condition where internal::send() is
+  // called after SocketManager::link() but before the socket is
+  // connected, we initialize the 'outgoing' queue in
+  // SocketManager::link() and then check if the queue has anything in
+  // it to send during this connection completion. When a subsequent
+  // call to SocketManager::send() occurs we'll now just add the
+  // encoder to the 'outgoing' queue, and when we complete the
+  // connection here we'll start sending, otherwise when we call
+  // SocketManager::next() the 'outgoing' queue will get removed and
+  // any subsequent call to SocketManager::send() will take care of
+  // setting it back up and sending.
+  Encoder* encoder = socket_manager->next(*socket);
+
+  if (encoder != NULL) {
+    send(encoder, new Socket(*socket));
+  }
 }
 
+
 } // namespace internal {
 
 
@@ -1563,6 +1585,13 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       persists[to.node] = s;
 
+      // Initialize 'outgoing' to prevent a race with
+      // SocketManager::send() while the socket is not yet connected.
+      // Initializing the 'outgoing' queue prevents
+      // SocketManager::send() from trying to write before it's
+      // connected.
+      outgoing[s];
+
       socket.connect(to.node)
         .onAny(lambda::bind(
             &internal::link_connect,


[4/4] mesos git commit: Replace std::queue with std::vector in Libprocess Future.

Posted by be...@apache.org.
Replace std::queue with std::vector in Libprocess Future.

Use std::vector instead of std::queue to reduce dynamic allocations
and improve performance.

Review: https://reviews.apache.org/r/28196


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da7b4ac6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da7b4ac6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da7b4ac6

Branch: refs/heads/master
Commit: da7b4ac6c19e932e10d6c0321a2afcb7198883b5
Parents: 2885c80
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:04:04 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:04:05 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 50 +++++++++++----------
 1 file changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/da7b4ac6/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 608934d..0326b23 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -6,11 +6,11 @@
 
 #include <iostream>
 #include <list>
-#include <queue>
 #include <set>
 #if  __cplusplus >= 201103L
 #include <type_traits>
 #endif // __cplusplus >= 201103L
+#include <vector>
 
 #include <glog/logging.h>
 
@@ -547,11 +547,11 @@ private:
     bool associated;
     T* t;
     std::string* message; // Message associated with failure.
-    std::queue<DiscardCallback> onDiscardCallbacks;
-    std::queue<ReadyCallback> onReadyCallbacks;
-    std::queue<FailedCallback> onFailedCallbacks;
-    std::queue<DiscardedCallback> onDiscardedCallbacks;
-    std::queue<AnyCallback> onAnyCallbacks;
+    std::vector<DiscardCallback> onDiscardCallbacks;
+    std::vector<ReadyCallback> onReadyCallbacks;
+    std::vector<FailedCallback> onFailedCallbacks;
+    std::vector<DiscardedCallback> onDiscardedCallbacks;
+    std::vector<AnyCallback> onAnyCallbacks;
   };
 
   // Sets the value for this future, unless the future is already set,
@@ -570,13 +570,10 @@ namespace internal {
 
   // Helper for executing callbacks that have been registered.
   template <typename C, typename... Arguments>
-  void run(std::queue<C>& callbacks, Arguments&&... arguments)
+  void run(const std::vector<C>& callbacks, Arguments&&... arguments)
   {
-    // TODO(jmlvanre): replace this with a vector and make parameter
-    // const since we can iterate it safely.
-    while (!callbacks.empty()) {
-      callbacks.front()(std::forward<Arguments>(arguments)...);
-      callbacks.pop();
+    for (size_t i = 0; i < callbacks.size(); ++i) {
+      callbacks[i](std::forward<Arguments>(arguments)...);
     }
   }
 
@@ -947,8 +944,10 @@ bool Promise<T>::discard(Future<T> future)
   // DISCARDED so there should not be any concurrent modifications.
   if (result) {
     internal::run(future.data->onDiscardedCallbacks);
+    future.data->onDiscardedCallbacks.clear();
 
     internal::run(future.data->onAnyCallbacks, future);
+    future.data->onAnyCallbacks.clear();
   }
 
   return result;
@@ -1083,6 +1082,7 @@ bool Future<T>::discard()
   // 'Data::onDiscardCallbacks'.
   if (result) {
     internal::run(data->onDiscardCallbacks);
+    data->onDiscardCallbacks.clear();
   }
 
   return result;
@@ -1157,7 +1157,7 @@ bool Future<T>::await(const Duration& duration) const
   {
     if (data->state == PENDING) {
       pending = true;
-      data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
+      data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
     }
   }
   internal::release(&data->lock);
@@ -1210,7 +1210,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
     if (data->discard) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onDiscardCallbacks.push(std::move(callback));
+      data->onDiscardCallbacks.emplace_back(std::move(callback));
     }
   }
   internal::release(&data->lock);
@@ -1234,7 +1234,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
     if (data->state == READY) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onReadyCallbacks.push(std::move(callback));
+      data->onReadyCallbacks.emplace_back(std::move(callback));
     }
   }
   internal::release(&data->lock);
@@ -1258,7 +1258,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
     if (data->state == FAILED) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onFailedCallbacks.push(std::move(callback));
+      data->onFailedCallbacks.emplace_back(std::move(callback));
     }
   }
   internal::release(&data->lock);
@@ -1282,7 +1282,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
     if (data->state == DISCARDED) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onDiscardedCallbacks.push(std::move(callback));
+      data->onDiscardedCallbacks.emplace_back(std::move(callback));
     }
   }
   internal::release(&data->lock);
@@ -1304,7 +1304,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
   internal::acquire(&data->lock);
   {
     if (data->state == PENDING) {
-      data->onAnyCallbacks.push(std::move(callback));
+      data->onAnyCallbacks.emplace_back(std::move(callback));
     } else {
       run = true;
     }
@@ -1330,7 +1330,7 @@ const Future<T>& Future<T>::onDiscard(const DiscardCallback& callback) const
     if (data->discard) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onDiscardCallbacks.push(callback);
+      data->onDiscardCallbacks.push_back(callback);
     }
   }
   internal::release(&data->lock);
@@ -1354,7 +1354,7 @@ const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
     if (data->state == READY) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onReadyCallbacks.push(callback);
+      data->onReadyCallbacks.push_back(callback);
     }
   }
   internal::release(&data->lock);
@@ -1378,7 +1378,7 @@ const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
     if (data->state == FAILED) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onFailedCallbacks.push(callback);
+      data->onFailedCallbacks.push_back(callback);
     }
   }
   internal::release(&data->lock);
@@ -1403,7 +1403,7 @@ const Future<T>& Future<T>::onDiscarded(
     if (data->state == DISCARDED) {
       run = true;
     } else if (data->state == PENDING) {
-      data->onDiscardedCallbacks.push(callback);
+      data->onDiscardedCallbacks.push_back(callback);
     }
   }
   internal::release(&data->lock);
@@ -1425,7 +1425,7 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
   internal::acquire(&data->lock);
   {
     if (data->state == PENDING) {
-      data->onAnyCallbacks.push(callback);
+      data->onAnyCallbacks.push_back(callback);
     } else {
       run = true;
     }
@@ -1609,8 +1609,10 @@ bool Future<T>::set(const T& _t)
   // should not be any concurrent modications.
   if (result) {
     internal::run(data->onReadyCallbacks, *data->t);
+    data->onReadyCallbacks.clear();
 
     internal::run(data->onAnyCallbacks, *this);
+    data->onAnyCallbacks.clear();
   }
 
   return result;
@@ -1637,8 +1639,10 @@ bool Future<T>::fail(const std::string& _message)
   // should not be any concurrent modications.
   if (result) {
     internal::run(data->onFailedCallbacks, *data->message);
+    data->onFailedCallbacks.clear();
 
     internal::run(data->onAnyCallbacks, *this);
+    data->onAnyCallbacks.clear();
   }
 
   return result;


[3/4] mesos git commit: Factor out Future callback invocation.

Posted by be...@apache.org.
Factor out Future callback invocation.

Factor out callback invocation in Future to make the logic easier to
read. It also de-duplicates some code.

Review: https://reviews.apache.org/r/28195


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2885c809
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2885c809
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2885c809

Branch: refs/heads/master
Commit: 2885c809a0ee00e1ddbc72a10fbaa3456115eca0
Parents: 4b6e401
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:03:37 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:03:39 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 59 +++++++++------------
 1 file changed, 24 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2885c809/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 2e4f9ef..608934d 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -566,6 +566,23 @@ private:
 };
 
 
+namespace internal {
+
+  // Helper for executing callbacks that have been registered.
+  template <typename C, typename... Arguments>
+  void run(std::queue<C>& callbacks, Arguments&&... arguments)
+  {
+    // TODO(jmlvanre): replace this with a vector and make parameter
+    // const since we can iterate it safely.
+    while (!callbacks.empty()) {
+      callbacks.front()(std::forward<Arguments>(arguments)...);
+      callbacks.pop();
+    }
+  }
+
+} // namespace internal {
+
+
 // Represents a weak reference to a future. This class is used to
 // break cyclic dependencies between futures.
 template <typename T>
@@ -929,17 +946,9 @@ bool Promise<T>::discard(Future<T> future)
   // DISCARDED. We don't need a lock because the state is now in
   // DISCARDED so there should not be any concurrent modifications.
   if (result) {
-    while (!data->onDiscardedCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onDiscardedCallbacks.front()();
-      data->onDiscardedCallbacks.pop();
-    }
+    internal::run(future.data->onDiscardedCallbacks);
 
-    while (!data->onAnyCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onAnyCallbacks.front()(future);
-      data->onAnyCallbacks.pop();
-    }
+    internal::run(future.data->onAnyCallbacks, future);
   }
 
   return result;
@@ -1073,11 +1082,7 @@ bool Future<T>::discard()
   // be set so we won't be adding anything else to
   // 'Data::onDiscardCallbacks'.
   if (result) {
-    while (!data->onDiscardCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onDiscardCallbacks.front()();
-      data->onDiscardCallbacks.pop();
-    }
+    internal::run(data->onDiscardCallbacks);
   }
 
   return result;
@@ -1603,17 +1608,9 @@ bool Future<T>::set(const T& _t)
   // don't need a lock because the state is now in READY so there
   // should not be any concurrent modications.
   if (result) {
-    while (!data->onReadyCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onReadyCallbacks.front()(*data->t);
-      data->onReadyCallbacks.pop();
-    }
+    internal::run(data->onReadyCallbacks, *data->t);
 
-    while (!data->onAnyCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onAnyCallbacks.front()(*this);
-      data->onAnyCallbacks.pop();
-    }
+    internal::run(data->onAnyCallbacks, *this);
   }
 
   return result;
@@ -1639,17 +1636,9 @@ bool Future<T>::fail(const std::string& _message)
   // don't need a lock because the state is now in FAILED so there
   // should not be any concurrent modications.
   if (result) {
-    while (!data->onFailedCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onFailedCallbacks.front()(*data->message);
-      data->onFailedCallbacks.pop();
-    }
+    internal::run(data->onFailedCallbacks, *data->message);
 
-    while (!data->onAnyCallbacks.empty()) {
-      // TODO(*): Invoke callbacks in another execution context.
-      data->onAnyCallbacks.front()(*this);
-      data->onAnyCallbacks.pop();
-    }
+    internal::run(data->onAnyCallbacks, *this);
   }
 
   return result;