You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/21 17:04:31 UTC
[1/4] mesos git commit: Fix ProcTest.MultipleThreads hanging.
Repository: mesos
Updated Branches:
refs/heads/master 993dc2dc5 -> da7b4ac6c
Fix ProcTest.MultipleThreads hanging.
Replace 'sleep' with 'pthread_testcancel'.
Review: https://reviews.apache.org/r/28137
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c6d4311
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c6d4311
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c6d4311
Branch: refs/heads/master
Commit: 3c6d43117dd832a992343cfe46893b56517d8ffa
Parents: 993dc2d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:02:37 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:02:38 2014 -0800
----------------------------------------------------------------------
.../libprocess/3rdparty/stout/tests/proc_tests.cpp | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3c6d4311/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
index 56a63e8..d3c9aed 100644
--- a/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
+++ b/3rdparty/libprocess/3rdparty/stout/tests/proc_tests.cpp
@@ -73,11 +73,17 @@ TEST(ProcTest, SingleThread)
}
-void* threadFunction(void*)
+void* cancelFunction(void*)
{
// Newly created threads have PTHREAD_CANCEL_ENABLE and
- // PTHREAD_CANCEL_DEFERRED so they can be cancelled from the main thread.
- while (true) { sleep(1); }
+ // PTHREAD_CANCEL_DEFERRED so they can be cancelled from the main
+ // thread.
+ while (true) {
+ // Use pthread_testcancel() as opposed to sleep() because we've
+ // seen sleep() hang on certain linux machines even though sleep
+ // should be a cancellation point.
+ pthread_testcancel();
+ }
return NULL;
}
@@ -93,7 +99,7 @@ TEST(ProcTest, MultipleThreads)
// Create additional threads.
for (size_t i = 0; i < numThreads; i++)
{
- EXPECT_EQ(0, pthread_create(&pthreads[i], NULL, threadFunction, NULL));
+ EXPECT_EQ(0, pthread_create(&pthreads[i], NULL, cancelFunction, NULL));
}
// Check we have the expected number of threads.
[2/4] mesos git commit: Fix a race between SocketManager link() and
send().
Posted by be...@apache.org.
Fix a race between SocketManager link() and send().
Review: https://reviews.apache.org/r/27967
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b6e4010
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b6e4010
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b6e4010
Branch: refs/heads/master
Commit: 4b6e4010a3d6461e249b11320e1d39d373da7c94
Parents: 3c6d431
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:03:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:03:01 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 29 +++++++++++++++++++++++++++++
1 file changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b6e4010/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 00cd89f..cf75b51 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1511,6 +1511,10 @@ void ignore_read_data(
}
+// Forward declaration.
+void send(Encoder* encoder, Socket* socket);
+
+
void link_connect(const Future<Nothing>& future, Socket* socket)
{
if (future.isDiscarded() || future.isFailed()) {
@@ -1532,8 +1536,26 @@ void link_connect(const Future<Nothing>& future, Socket* socket)
socket,
data,
size));
+
+ // In order to avoid a race condition where internal::send() is
+ // called after SocketManager::link() but before the socket is
+ // connected, we initialize the 'outgoing' queue in
+ // SocketManager::link() and then check if the queue has anything in
+ // it to send during this connection completion. When a subsequent
+ // call to SocketManager::send() occurs we'll now just add the
+ // encoder to the 'outgoing' queue, and when we complete the
+ // connection here we'll start sending, otherwise when we call
+ // SocketManager::next() the 'outgoing' queue will get removed and
+ // any subsequent call to SocketManager::send() will take care of
+ // setting it back up and sending.
+ Encoder* encoder = socket_manager->next(*socket);
+
+ if (encoder != NULL) {
+ send(encoder, new Socket(*socket));
+ }
}
+
} // namespace internal {
@@ -1563,6 +1585,13 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
persists[to.node] = s;
+ // Initialize 'outgoing' to prevent a race with
+ // SocketManager::send() while the socket is not yet connected.
+ // Initializing the 'outgoing' queue prevents
+ // SocketManager::send() from trying to write before it's
+ // connected.
+ outgoing[s];
+
socket.connect(to.node)
.onAny(lambda::bind(
&internal::link_connect,
[4/4] mesos git commit: Replace std::queue with std::vector in
Libprocess Future.
Posted by be...@apache.org.
Replace std::queue with std::vector in Libprocess Future.
Use std::vector instead of std::queue to reduce dynamic allocations
and improve performance.
Review: https://reviews.apache.org/r/28196
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da7b4ac6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da7b4ac6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da7b4ac6
Branch: refs/heads/master
Commit: da7b4ac6c19e932e10d6c0321a2afcb7198883b5
Parents: 2885c80
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:04:04 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:04:05 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 50 +++++++++++----------
1 file changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/da7b4ac6/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 608934d..0326b23 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -6,11 +6,11 @@
#include <iostream>
#include <list>
-#include <queue>
#include <set>
#if __cplusplus >= 201103L
#include <type_traits>
#endif // __cplusplus >= 201103L
+#include <vector>
#include <glog/logging.h>
@@ -547,11 +547,11 @@ private:
bool associated;
T* t;
std::string* message; // Message associated with failure.
- std::queue<DiscardCallback> onDiscardCallbacks;
- std::queue<ReadyCallback> onReadyCallbacks;
- std::queue<FailedCallback> onFailedCallbacks;
- std::queue<DiscardedCallback> onDiscardedCallbacks;
- std::queue<AnyCallback> onAnyCallbacks;
+ std::vector<DiscardCallback> onDiscardCallbacks;
+ std::vector<ReadyCallback> onReadyCallbacks;
+ std::vector<FailedCallback> onFailedCallbacks;
+ std::vector<DiscardedCallback> onDiscardedCallbacks;
+ std::vector<AnyCallback> onAnyCallbacks;
};
// Sets the value for this future, unless the future is already set,
@@ -570,13 +570,10 @@ namespace internal {
// Helper for executing callbacks that have been registered.
template <typename C, typename... Arguments>
- void run(std::queue<C>& callbacks, Arguments&&... arguments)
+ void run(const std::vector<C>& callbacks, Arguments&&... arguments)
{
- // TODO(jmlvanre): replace this with a vector and make parameter
- // const since we can iterate it safely.
- while (!callbacks.empty()) {
- callbacks.front()(std::forward<Arguments>(arguments)...);
- callbacks.pop();
+ for (size_t i = 0; i < callbacks.size(); ++i) {
+ callbacks[i](std::forward<Arguments>(arguments)...);
}
}
@@ -947,8 +944,10 @@ bool Promise<T>::discard(Future<T> future)
// DISCARDED so there should not be any concurrent modifications.
if (result) {
internal::run(future.data->onDiscardedCallbacks);
+ future.data->onDiscardedCallbacks.clear();
internal::run(future.data->onAnyCallbacks, future);
+ future.data->onAnyCallbacks.clear();
}
return result;
@@ -1083,6 +1082,7 @@ bool Future<T>::discard()
// 'Data::onDiscardCallbacks'.
if (result) {
internal::run(data->onDiscardCallbacks);
+ data->onDiscardCallbacks.clear();
}
return result;
@@ -1157,7 +1157,7 @@ bool Future<T>::await(const Duration& duration) const
{
if (data->state == PENDING) {
pending = true;
- data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
+ data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
}
}
internal::release(&data->lock);
@@ -1210,7 +1210,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardCallbacks.push(std::move(callback));
+ data->onDiscardCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1234,7 +1234,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
- data->onReadyCallbacks.push(std::move(callback));
+ data->onReadyCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1258,7 +1258,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
- data->onFailedCallbacks.push(std::move(callback));
+ data->onFailedCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1282,7 +1282,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardedCallbacks.push(std::move(callback));
+ data->onDiscardedCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1304,7 +1304,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
internal::acquire(&data->lock);
{
if (data->state == PENDING) {
- data->onAnyCallbacks.push(std::move(callback));
+ data->onAnyCallbacks.emplace_back(std::move(callback));
} else {
run = true;
}
@@ -1330,7 +1330,7 @@ const Future<T>& Future<T>::onDiscard(const DiscardCallback& callback) const
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardCallbacks.push(callback);
+ data->onDiscardCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1354,7 +1354,7 @@ const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
- data->onReadyCallbacks.push(callback);
+ data->onReadyCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1378,7 +1378,7 @@ const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
- data->onFailedCallbacks.push(callback);
+ data->onFailedCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1403,7 +1403,7 @@ const Future<T>& Future<T>::onDiscarded(
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardedCallbacks.push(callback);
+ data->onDiscardedCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1425,7 +1425,7 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
internal::acquire(&data->lock);
{
if (data->state == PENDING) {
- data->onAnyCallbacks.push(callback);
+ data->onAnyCallbacks.push_back(callback);
} else {
run = true;
}
@@ -1609,8 +1609,10 @@ bool Future<T>::set(const T& _t)
// should not be any concurrent modications.
if (result) {
internal::run(data->onReadyCallbacks, *data->t);
+ data->onReadyCallbacks.clear();
internal::run(data->onAnyCallbacks, *this);
+ data->onAnyCallbacks.clear();
}
return result;
@@ -1637,8 +1639,10 @@ bool Future<T>::fail(const std::string& _message)
// should not be any concurrent modications.
if (result) {
internal::run(data->onFailedCallbacks, *data->message);
+ data->onFailedCallbacks.clear();
internal::run(data->onAnyCallbacks, *this);
+ data->onAnyCallbacks.clear();
}
return result;
[3/4] mesos git commit: Factor out Future callback invocation.
Posted by be...@apache.org.
Factor out Future callback invocation.
Factor out callback invocation in Future to make the logic easier to
read. It also de-duplicates some code.
Review: https://reviews.apache.org/r/28195
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2885c809
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2885c809
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2885c809
Branch: refs/heads/master
Commit: 2885c809a0ee00e1ddbc72a10fbaa3456115eca0
Parents: 4b6e401
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:03:37 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:03:39 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 59 +++++++++------------
1 file changed, 24 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2885c809/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 2e4f9ef..608934d 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -566,6 +566,23 @@ private:
};
+namespace internal {
+
+ // Helper for executing callbacks that have been registered.
+ template <typename C, typename... Arguments>
+ void run(std::queue<C>& callbacks, Arguments&&... arguments)
+ {
+ // TODO(jmlvanre): replace this with a vector and make parameter
+ // const since we can iterate it safely.
+ while (!callbacks.empty()) {
+ callbacks.front()(std::forward<Arguments>(arguments)...);
+ callbacks.pop();
+ }
+ }
+
+} // namespace internal {
+
+
// Represents a weak reference to a future. This class is used to
// break cyclic dependencies between futures.
template <typename T>
@@ -929,17 +946,9 @@ bool Promise<T>::discard(Future<T> future)
// DISCARDED. We don't need a lock because the state is now in
// DISCARDED so there should not be any concurrent modifications.
if (result) {
- while (!data->onDiscardedCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onDiscardedCallbacks.front()();
- data->onDiscardedCallbacks.pop();
- }
+ internal::run(future.data->onDiscardedCallbacks);
- while (!data->onAnyCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onAnyCallbacks.front()(future);
- data->onAnyCallbacks.pop();
- }
+ internal::run(future.data->onAnyCallbacks, future);
}
return result;
@@ -1073,11 +1082,7 @@ bool Future<T>::discard()
// be set so we won't be adding anything else to
// 'Data::onDiscardCallbacks'.
if (result) {
- while (!data->onDiscardCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onDiscardCallbacks.front()();
- data->onDiscardCallbacks.pop();
- }
+ internal::run(data->onDiscardCallbacks);
}
return result;
@@ -1603,17 +1608,9 @@ bool Future<T>::set(const T& _t)
// don't need a lock because the state is now in READY so there
// should not be any concurrent modications.
if (result) {
- while (!data->onReadyCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onReadyCallbacks.front()(*data->t);
- data->onReadyCallbacks.pop();
- }
+ internal::run(data->onReadyCallbacks, *data->t);
- while (!data->onAnyCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onAnyCallbacks.front()(*this);
- data->onAnyCallbacks.pop();
- }
+ internal::run(data->onAnyCallbacks, *this);
}
return result;
@@ -1639,17 +1636,9 @@ bool Future<T>::fail(const std::string& _message)
// don't need a lock because the state is now in FAILED so there
// should not be any concurrent modications.
if (result) {
- while (!data->onFailedCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onFailedCallbacks.front()(*data->message);
- data->onFailedCallbacks.pop();
- }
+ internal::run(data->onFailedCallbacks, *data->message);
- while (!data->onAnyCallbacks.empty()) {
- // TODO(*): Invoke callbacks in another execution context.
- data->onAnyCallbacks.front()(*this);
- data->onAnyCallbacks.pop();
- }
+ internal::run(data->onAnyCallbacks, *this);
}
return result;