You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2018/08/07 14:47:41 UTC
[mesos] branch master updated: Replaced exchange in
MpscLinkedQueue::dequeue with load/store.
This is an automated email from the ASF dual-hosted git repository.
jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 57f22aa Replaced exchange in MpscLinkedQueue::dequeue with load/store.
57f22aa is described below
commit 57f22aad4ce12c607a2547f9801c70eea595cc90
Author: Dario Rexin <dr...@apple.com>
AuthorDate: Mon Aug 6 13:29:08 2018 -0700
Replaced exchange in MpscLinkedQueue::dequeue with load/store.
There's only a single consumer, so we don't need to exchange
in dequeue. Testing shows that a load/store pair is much
more efficient, as it needs fewer CPU cycles. Especially
in the empty queue case, where the store does not have to
be executed at all.
Review: https://reviews.apache.org/r/68149/
---
3rdparty/libprocess/src/mpsc_linked_queue.hpp | 14 ++---
3rdparty/libprocess/src/tests/benchmarks.cpp | 79 +++++++++++++++++++++++++++
2 files changed, 86 insertions(+), 7 deletions(-)
diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
index fc55d43..183e619 100644
--- a/3rdparty/libprocess/src/mpsc_linked_queue.hpp
+++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
@@ -75,7 +75,7 @@ public:
// Exchange is guaranteed to only give the old value to one
// producer, so this is safe and wait-free.
- auto oldhead = head.exchange(newNode, std::memory_order_release);
+ auto oldhead = head.exchange(newNode, std::memory_order_acq_rel);
// At this point if this thread context switches out we may block
// the consumer from doing a dequeue (see below). Eventually we'll
@@ -91,9 +91,7 @@ public:
// Check and see if there is an actual element linked from `tail`
// since we use `tail` as a "stub" rather than the actual element.
- auto nextTail = currentTail->next.exchange(
- nullptr,
- std::memory_order_relaxed);
+ auto nextTail = currentTail->next.load(std::memory_order_acquire);
// There are three possible cases here:
//
@@ -115,14 +113,16 @@ public:
// connect it to the tail, yet, so we spin-wait. At this point
// we are not wait-free anymore.
do {
- nextTail = currentTail->next.exchange(
- nullptr,
- std::memory_order_relaxed);
+ nextTail = currentTail->next.load(std::memory_order_acquire);
} while (nextTail == nullptr);
}
CHECK_NOTNULL(nextTail);
+ // Set next pointer of current tail to null to disconnect it
+ // from the queue.
+ currentTail->next.store(nullptr, std::memory_order_release);
+
auto element = nextTail->element;
nextTail->element = nullptr;
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 0b94aaf..16b3bca 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -837,3 +837,82 @@ TEST_P(Metrics_BENCHMARK_Test, Scalability)
std::cout << "Removed " << metrics_count << " counters in "
<< watch.elapsed() << std::endl;
}
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueueEmpty)
+{
+ const int messageCount = 1000000000;
+ process::MpscLinkedQueue<std::string> q;
+
+ Stopwatch consumerWatch;
+ consumerWatch.start();
+
+ for (int i = messageCount; i > 0; i--) {
+ q.dequeue();
+ }
+
+ consumerWatch.stop();
+
+ Duration consumerElapsed = consumerWatch.elapsed();
+
+ double consumerThroughput = messageCount / consumerElapsed.secs();
+
+ cout << "Estimated consumer throughput: "
+ << std::fixed << consumerThroughput << " op/s" << endl;
+}
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueueNonContendedRead)
+{
+ // NOTE: we set the total number of producers to be 1 less than the
+ // hardware concurrency so the consumer doesn't have to fight for
+ // processing time with the producers.
+ const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
+ const int messageCount = 10000000;
+ const int totalCount = messageCount * producerCount;
+ std::string* s = new std::string("");
+ process::MpscLinkedQueue<std::string> q;
+
+ std::vector<std::thread> producers;
+ for (unsigned int t = 0; t < producerCount; t++) {
+ producers.push_back(std::thread([s, &q]() {
+ for (int i = 0; i < messageCount; i++) {
+ q.enqueue(s);
+ }
+ }));
+ }
+
+ Stopwatch producerWatch;
+ producerWatch.start();
+
+ for (std::thread& producer : producers) {
+ producer.join();
+ }
+
+ producerWatch.stop();
+
+ Stopwatch consumerWatch;
+ consumerWatch.start();
+
+ for (int i = totalCount; i > 0;) {
+ if (q.dequeue() != nullptr) {
+ i--;
+ }
+ }
+
+ consumerWatch.stop();
+
+ Duration producerElapsed = producerWatch.elapsed();
+ Duration consumerElapsed = consumerWatch.elapsed();
+
+ double consumerThroughput = totalCount / consumerElapsed.secs();
+ double producerThroughput = totalCount / producerElapsed.secs();
+ double throughput = consumerThroughput + producerThroughput;
+
+ cout << "Estimated producer throughput (" << producerCount << " threads): "
+ << std::fixed << producerThroughput << " op/s" << endl;
+ cout << "Estimated consumer throughput: "
+ << std::fixed << consumerThroughput << " op/s" << endl;
+ cout << "Estimated total throughput: "
+ << std::fixed << throughput << " op/s" << endl;
+}