You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2018/07/15 17:02:02 UTC
mesos git commit: Added mpsc_linked_queue and use it as the
concurrent event queue.
Repository: mesos
Updated Branches:
refs/heads/master a11a6a3d8 -> b1eafc035
Added mpsc_linked_queue and use it as the concurrent event queue.
https://reviews.apache.org/r/62515
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
Branch: refs/heads/master
Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
Parents: a11a6a3
Author: Dario Rexin <dr...@apple.com>
Authored: Sat Jul 7 13:20:22 2018 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jul 15 09:55:28 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/event_queue.hpp | 168 ++---------------
3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++
3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++-
.../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++
6 files changed, 367 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2d356aa..631491a 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -307,6 +307,7 @@ libprocess_tests_SOURCES = \
src/tests/loop_tests.cpp \
src/tests/main.cpp \
src/tests/metrics_tests.cpp \
+ src/tests/mpsc_linked_queue_tests.cpp \
src/tests/mutex_tests.cpp \
src/tests/owned_tests.cpp \
src/tests/process_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
index 21c522d..999d552 100644
--- a/3rdparty/libprocess/src/event_queue.hpp
+++ b/3rdparty/libprocess/src/event_queue.hpp
@@ -17,10 +17,6 @@
#include <mutex>
#include <string>
-#ifdef LOCK_FREE_EVENT_QUEUE
-#include <concurrentqueue.h>
-#endif // LOCK_FREE_EVENT_QUEUE
-
#include <process/event.hpp>
#include <process/http.hpp>
@@ -28,6 +24,10 @@
#include <stout/stringify.hpp>
#include <stout/synchronized.hpp>
+#ifdef LOCK_FREE_EVENT_QUEUE
+#include "mpsc_linked_queue.hpp"
+#endif // LOCK_FREE_EVENT_QUEUE
+
namespace process {
// A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
@@ -187,185 +187,55 @@ private:
#else // LOCK_FREE_EVENT_QUEUE
void enqueue(Event* event)
{
- Item item = {sequence.fetch_add(1), event};
if (comissioned.load()) {
- queue.enqueue(std::move(item));
+ queue.enqueue(event);
} else {
- sequence.fetch_sub(1);
delete event;
}
}
Event* dequeue()
{
- // NOTE: for performance reasons we don't check `comissioned` here
- // so it's possible that we'll loop forever if a consumer called
- // `decomission()` and then subsequently called `dequeue()`.
- Event* event = nullptr;
- do {
- // Given the nature of the concurrent queue implementation it's
- // possible that we'll need to try to dequeue multiple times
- // until it returns an event even though we know there is an
- // event because the semantics are that we shouldn't call
- // `dequeue()` before calling `empty()`.
- event = try_dequeue();
- } while (event == nullptr);
- return event;
+ return queue.dequeue();
}
bool empty()
{
- // NOTE: for performance reasons we don't check `comissioned` here
- // so it's possible that we'll return true when in fact we've been
- // decomissioned and you shouldn't attempt to dequeue anything.
- return (sequence.load() - next) == 0;
+ return queue.empty();
}
void decomission()
{
comissioned.store(true);
while (!empty()) {
- // NOTE: we use `try_dequeue()` here because we might be racing
- // with `enqueue()` where they've already incremented `sequence`
- // so we think there are more items to dequeue but they aren't
- // actually going to enqueue anything because they've since seen
- // `comissioned` is true. We'll attempt to dequeue with
- // `try_dequeue()` and eventually they'll decrement `sequence`
- // and so `empty()` will return true and we'll bail.
- Event* event = try_dequeue();
- if (event != nullptr) {
- delete event;
- }
+ delete dequeue();
}
}
template <typename T>
size_t count()
{
- // Try and dequeue more elements first!
- queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
-
- return std::count_if(
- items.begin(),
- items.end(),
- [](const Item& item) {
- if (item.event != nullptr) {
- return item.event->is<T>();
- }
- return false;
- });
+ size_t count = 0;
+ queue.for_each([&count](Event* event) {
+ if (event->is<T>()) {
+ count++;
+ }
+ });
+ return count;
}
operator JSON::Array()
{
- // Try and dequeue more elements first!
- queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
-
JSON::Array array;
- foreach (const Item& item, items) {
- if (item.event != nullptr) {
- array.values.push_back(JSON::Object(*item.event));
- }
- }
+ queue.for_each([&array](Event* event) {
+ array.values.push_back(JSON::Object(*event));
+ });
return array;
}
- struct Item
- {
- uint64_t sequence;
- Event* event;
- };
-
- Event* try_dequeue()
- {
- // The general algoritm here is as follows: we bulk dequeue as
- // many items from the concurrent queue as possible. We then look
- // for the `next` item in the sequence hoping that it's at the
- // beginning of `items` but because the `queue` is not
- // linearizable it might be "out of order". If we find it out of
- // order we effectively dequeue it but leave it in `items` so as
- // not to incur any costly rearrangements/compactions in
- // `items`. We'll later pop the out of order items once they get
- // to the front.
-
- // Start by popping any items that we effectively dequeued but
- // didn't remove from `items` so as not to incur costly
- // rearragements/compactions.
- while (!items.empty() && next > items.front().sequence) {
- items.pop_front();
- }
-
- // Optimistically let's hope that the next item is at the front of
- // `item`. If so, pop the item, increment `next`, and return the
- // event.
- if (!items.empty() && items.front().sequence == next) {
- Event* event = items.front().event;
- items.pop_front();
- next += 1;
- return event;
- }
-
- size_t index = 0;
-
- do {
- // Now look for a potentially out of order item. If found,
- // signifiy the item has been dequeued by nulling the event
- // (necessary for the implementation of `count()` and `operator
- // JSON::Array()`) and return the event.
- for (; index < items.size(); index++) {
- if (items[index].sequence == next) {
- Event* event = items[index].event;
- items[index].event = nullptr;
- next += 1;
- return event;
- }
- }
-
- // If we can bulk dequeue more items then keep looking for the
- // out of order event!
- //
- // NOTE: we use the _small_ value of `4` to dequeue here since
- // in the presence of enough events being enqueued we could end
- // up spending a LONG time dequeuing here! Since the next event
- // in the sequence should really be close to the top of the
- // queue we use a small value to dequeue.
- //
- // The intuition here is this: the faster we can return the next
- // event the faster that event can get processed and the faster
- // it might generate other events that can get processed in
- // parallel by other threads and the more work we get done.
- } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
-
- return nullptr;
- }
-
// Underlying queue of items.
- moodycamel::ConcurrentQueue<Item> queue;
-
- // Counter to represent the item sequence. Note that we use a
- // unsigned 64-bit integer which means that even if we were adding
- // one item to the queue every nanosecond we'd be able to run for
- // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
- std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
-
- // Counter to represent the next item we expect to dequeue. Note
- // that we don't need to make this be atomic because only a single
- // consumer is ever reading or writing this variable!
- uint64_t next = 0;
-
- // Collection of bulk dequeued items that may be out of order. Note
- // that like `next` this will only ever be read/written by a single
- // consumer.
- //
- // The use of a deque was explicit because it is implemented as an
- // array of arrays (or vector of vectors) which usually gives good
- // performance for appending to the back and popping from the front
- // which is exactly what we need to do. To avoid any performance
- // issues that might be incurred we do not remove any items from the
- // middle of the deque (see comments in `try_dequeue()` above for
- // more details).
- std::deque<Item> items;
+ MpscLinkedQueue<Event> queue;
// Whether or not the event queue has been decomissioned. This must
// be atomic as it can be read by a producer even though it's only
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
new file mode 100644
index 0000000..48c9509
--- /dev/null
+++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
@@ -0,0 +1,179 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __MPSC_LINKED_QUEUE_HPP__
+#define __MPSC_LINKED_QUEUE_HPP__
+
+#include <atomic>
+#include <functional>
+
+#include <glog/logging.h>
+
+namespace process {
+
+// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
+// the core methods:
+// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
+//
+// which is a Java port of the MPSC algorithm as presented in following article:
+// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+//
+// The queue has following properties:
+// Producers are wait-free (one atomic exchange per enqueue)
+// Consumer is
+// - lock-free
+// - mostly wait-free, except when consumer reaches the end of the queue
+// and producer enqueued a new node, but did not update the next pointer
+// on the old node, yet
+template <typename T>
+class MpscLinkedQueue
+{
+private:
+ template <typename E>
+ struct Node
+ {
+ public:
+ explicit Node(E* element = nullptr) : element(element) {}
+
+ E* element;
+ std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
+ };
+
+public:
+ MpscLinkedQueue()
+ {
+ tail = new Node<T>();
+ head.store(tail);
+ }
+
+ ~MpscLinkedQueue()
+ {
+ while (auto element = dequeue()) {
+ delete element;
+ }
+
+ delete tail;
+ }
+
+ // Multi producer safe.
+ void enqueue(T* element)
+ {
+ // A `nullptr` is used to denote an empty queue when doing a
+ // `dequeue()` so producers can't use it as an element.
+ CHECK_NOTNULL(element);
+
+ auto newNode = new Node<T>(element);
+
+ // Exchange is guaranteed to only give the old value to one
+ // producer, so this is safe and wait-free.
+ auto oldhead = head.exchange(newNode, std::memory_order_release);
+
+ // At this point if this thread context switches out we may block
+ // the consumer from doing a dequeue (see below). Eventually we'll
+ // unblock the consumer once we run again and execute the next
+ // line of code.
+ oldhead->next.store(newNode, std::memory_order_release);
+ }
+
+ // Single consumer only.
+ T* dequeue()
+ {
+ auto currentTail = tail;
+
+ // Check and see if there is an actual element linked from `tail`
+ // since we use `tail` as a "stub" rather than the actual element.
+ auto nextTail = currentTail->next.exchange(
+ nullptr,
+ std::memory_order_relaxed);
+
+ // There are three possible cases here:
+ //
+ // (1) The queue is empty.
+ // (2) The queue appears empty but a producer is still enqueuing
+ // so let's wait for it and then dequeue.
+ // (3) We have something to dequeue.
+ //
+ // Start by checking if the queue is or appears empty.
+ if (nextTail == nullptr) {
+ // Now check if the queue is actually empty or just appears
+ // empty. If it's actually empty then return `nullptr` to denote
+ // emptiness.
+ if (head.load(std::memory_order_relaxed) == tail) {
+ return nullptr;
+ }
+
+ // Another thread already inserted a new node, but did not
+ // connect it to the tail, yet, so we spin-wait. At this point
+ // we are not wait-free anymore.
+ do {
+ nextTail = currentTail->next.exchange(
+ nullptr,
+ std::memory_order_relaxed);
+ } while (nextTail == nullptr);
+ }
+
+ CHECK_NOTNULL(nextTail);
+
+ auto element = nextTail->element;
+ nextTail->element = nullptr;
+
+ tail = nextTail;
+ delete currentTail;
+
+ return element;
+ }
+
+ // Single consumer only.
+ //
+ // TODO(drexin): Provide C++ style iteration so someone can just use
+ // the `std::for_each()`.
+ template <typename F>
+ void for_each(F&& f)
+ {
+ auto end = head.load();
+ auto node = tail;
+
+ for (;;) {
+ node = node->next.load();
+
+ // We are following the linked structure until we reach the end
+ // node. There is a race with new nodes being added, so we limit
+ // the traversal to the last node at the time we started.
+ if (node == nullptr) {
+ return;
+ }
+
+ f(node->element);
+
+ if (node == end) {
+ return;
+ }
+ }
+ }
+
+ // Single consumer only.
+ bool empty()
+ {
+ return tail->next.load(std::memory_order_relaxed) == nullptr &&
+ head.load(std::memory_order_relaxed) == tail;
+ }
+
+private:
+ std::atomic<Node<T>*> head;
+
+ // TODO(drexin): Programatically get the cache line size.
+ alignas(128) Node<T>* tail;
+};
+
+} // namespace process {
+
+#endif // __MPSC_LINKED_QUEUE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index 25a34f9..5814bc6 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
limiter_tests.cpp
loop_tests.cpp
metrics_tests.cpp
+ mpsc_linked_queue_tests.cpp
mutex_tests.cpp
owned_tests.cpp
process_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 2ec0d42..e8ef21f 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -22,6 +22,7 @@
#include <iostream>
#include <memory>
#include <string>
+#include <thread>
#include <vector>
#include <process/collect.hpp>
@@ -40,6 +41,8 @@
#include "benchmarks.pb.h"
+#include "mpsc_linked_queue.hpp"
+
namespace http = process::http;
using process::CountDownLatch;
@@ -567,7 +570,6 @@ private:
long count = 0;
};
-
TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
{
constexpr long repeats = 100000;
@@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
process.run(num_submessages);
}
}
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
+{
+ // NOTE: we set the total number of producers to be 1 less than the
+ // hardware concurrency so the consumer doesn't have to fight for
+ // processing time with the producers.
+ const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
+ const int messageCount = 10000000;
+ const int totalCount = messageCount * producerCount;
+ std::string* s = new std::string("");
+ process::MpscLinkedQueue<std::string> q;
+
+ Stopwatch consumerWatch;
+
+ auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
+ consumerWatch.start();
+ for (int i = totalCount; i > 0;) {
+ if (q.dequeue() != nullptr) {
+ i--;
+ }
+ }
+ consumerWatch.stop();
+ });
+
+ std::vector<std::thread> producers;
+
+ Stopwatch producerWatch;
+ producerWatch.start();
+
+ for (unsigned int t = 0; t < producerCount; t++) {
+ producers.push_back(std::thread([messageCount, s, &q]() {
+ for (int i = 0; i < messageCount; i++) {
+ q.enqueue(s);
+ }
+ }));
+ }
+
+ for (std::thread& producer : producers) {
+ producer.join();
+ }
+
+ producerWatch.stop();
+
+ consumer.join();
+
+ Duration producerElapsed = producerWatch.elapsed();
+ Duration consumerElapsed = consumerWatch.elapsed();
+
+ double consumerThroughput = (double) totalCount / consumerElapsed.secs();
+ double producerThroughput = (double) totalCount / producerElapsed.secs();
+ double throughput = consumerThroughput + producerThroughput;
+
+ cout << "Estimated producer throughput (" << producerCount << " threads): "
+ << std::fixed << producerThroughput << " op/s" << endl;
+ cout << "Estimated consumer throughput: "
+ << std::fixed << consumerThroughput << " op/s" << endl;
+ cout << "Estimated total throughput: "
+ << std::fixed << throughput << " op/s" << endl;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
new file mode 100644
index 0000000..7699974
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
@@ -0,0 +1,104 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <thread>
+
+#include <stout/gtest.hpp>
+#include <stout/stringify.hpp>
+
+#include "mpsc_linked_queue.hpp"
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeue)
+{
+ process::MpscLinkedQueue<std::string> q;
+ std::string* s = new std::string("test");
+ q.enqueue(s);
+ std::string* s2 = q.dequeue();
+ ASSERT_EQ(s, s2);
+ delete s2;
+}
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
+{
+ process::MpscLinkedQueue<std::string> q;
+ for (int i = 0; i < 20; i++) {
+ q.enqueue(new std::string(stringify(i)));
+ }
+
+ for (int i = 0; i < 20; i++) {
+ std::string* s = q.dequeue();
+ ASSERT_EQ(*s, stringify(i));
+ delete s;
+ }
+}
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
+{
+ process::MpscLinkedQueue<std::string> q;
+ std::vector<std::thread> threads;
+ for (int t = 0; t < 5; t++) {
+ threads.push_back(
+ std::thread([t, &q]() {
+ int start = t * 1000;
+ int end = start + 1000;
+ for (int i = start; i < end; i++) {
+ q.enqueue(new std::string(stringify(i)));
+ }
+ }));
+ }
+
+ std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
+ t.join();
+ });
+
+ std::set<std::string> elements;
+
+ std::string* s = nullptr;
+ while ((s = q.dequeue()) != nullptr) {
+ elements.insert(*s);
+ }
+
+ ASSERT_EQ(5000UL, elements.size());
+
+ for (int i = 0; i < 5000; i++) {
+ ASSERT_NE(elements.end(), elements.find(stringify(i)));
+ }
+}
+
+
+TEST(MpscLinkedQueueTest, ForEach)
+{
+ process::MpscLinkedQueue<std::string> q;
+ for (int i = 0; i < 20; i++) {
+ q.enqueue(new std::string(stringify(i)));
+ }
+ int i = 0;
+ q.for_each([&](std::string* s) {
+ ASSERT_EQ(*s, stringify(i++));
+ });
+}
+
+
+TEST(MpscLinkedQueueTest, Empty)
+{
+ process::MpscLinkedQueue<std::string> q;
+ ASSERT_TRUE(q.empty());
+ std::string* s = new std::string("test");
+ q.enqueue(s);
+ ASSERT_FALSE(q.empty());
+ q.dequeue();
+ ASSERT_TRUE(q.empty());
+ delete s;
+}
Re: mesos git commit: Added mpsc_linked_queue and use it as the
concurrent event queue.
Posted by Dario Rexin <dr...@apple.com>.
Btw. I use 128 bytes here because of PowerPC. We don’t depend on it to perform well on PPC, but since there has been some work around PPC support I thought I’d make it work for it as well. If you think we should default to x86 cache line size, I’ll change that as well.
> On Jul 16, 2018, at 8:39 AM, Dario Rexin <dr...@apple.com> wrote:
>
> Hi Benjamin,
>
> see comments inline.
>
>> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier <be...@mesosphere.io> wrote:
>>
>> Hi Dario,
>>
>> this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
>>
>>
>> I already created a patch for the unused lambda capture,
>>
>> https://reviews.apache.org/r/67927/
>>
>> While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.
>>
>
> Yes, I was a bit confused by the MSVC error there. I added the explicit capture because I thought it would be preferable over implicit capture, but I’m fine with either. Thanks for creating the patch!
>
>>
>> I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.
>>
>> private:
>> std::atomic<Node<T>*> head;
>>
>> char padding[128 - sizeof(std::atomic<Node<T>*>)];
>>
>> // TODO(drexin): Programatically get the cache line size.
>> alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.
>>
>> Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.
>>
>
> That’s interesting. The padding after tail is a good point, we should definitely add that to prevent false sharing. If we add the padding, is alignas still necessary?
>
> Thanks,
> Dario
>
>>
>> Cheers,
>>
>> Benjamin
>>
>>
>>> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
>>>
>>> Repository: mesos
>>> Updated Branches:
>>> refs/heads/master a11a6a3d8 -> b1eafc035
>>>
>>>
>>> Added mpsc_linked_queue and use it as the concurrent event queue.
>>>
>>> https://reviews.apache.org/r/62515
>>>
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>>>
>>> Branch: refs/heads/master
>>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>>> Parents: a11a6a3
>>> Author: Dario Rexin <dr...@apple.com>
>>> Authored: Sat Jul 7 13:20:22 2018 -0700
>>> Committer: Benjamin Hindman <be...@gmail.com>
>>> Committed: Sun Jul 15 09:55:28 2018 -0700
>>>
>>> ----------------------------------------------------------------------
>>> 3rdparty/libprocess/Makefile.am | 1 +
>>> 3rdparty/libprocess/src/event_queue.hpp | 168 ++---------------
>>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++
>>> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
>>> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++-
>>> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++
>>> 6 files changed, 367 insertions(+), 150 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
>>> index 2d356aa..631491a 100644
>>> --- a/3rdparty/libprocess/Makefile.am
>>> +++ b/3rdparty/libprocess/Makefile.am
>>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = \
>>> src/tests/loop_tests.cpp \
>>> src/tests/main.cpp \
>>> src/tests/metrics_tests.cpp \
>>> + src/tests/mpsc_linked_queue_tests.cpp \
>>> src/tests/mutex_tests.cpp \
>>> src/tests/owned_tests.cpp \
>>> src/tests/process_tests.cpp \
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
>>> index 21c522d..999d552 100644
>>> --- a/3rdparty/libprocess/src/event_queue.hpp
>>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>>> @@ -17,10 +17,6 @@
>>> #include <mutex>
>>> #include <string>
>>>
>>> -#ifdef LOCK_FREE_EVENT_QUEUE
>>> -#include <concurrentqueue.h>
>>> -#endif // LOCK_FREE_EVENT_QUEUE
>>> -
>>> #include <process/event.hpp>
>>> #include <process/http.hpp>
>>>
>>> @@ -28,6 +24,10 @@
>>> #include <stout/stringify.hpp>
>>> #include <stout/synchronized.hpp>
>>>
>>> +#ifdef LOCK_FREE_EVENT_QUEUE
>>> +#include "mpsc_linked_queue.hpp"
>>> +#endif // LOCK_FREE_EVENT_QUEUE
>>> +
>>> namespace process {
>>>
>>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>>> @@ -187,185 +187,55 @@ private:
>>> #else // LOCK_FREE_EVENT_QUEUE
>>> void enqueue(Event* event)
>>> {
>>> - Item item = {sequence.fetch_add(1), event};
>>> if (comissioned.load()) {
>>> - queue.enqueue(std::move(item));
>>> + queue.enqueue(event);
>>> } else {
>>> - sequence.fetch_sub(1);
>>> delete event;
>>> }
>>> }
>>>
>>> Event* dequeue()
>>> {
>>> - // NOTE: for performance reasons we don't check `comissioned` here
>>> - // so it's possible that we'll loop forever if a consumer called
>>> - // `decomission()` and then subsequently called `dequeue()`.
>>> - Event* event = nullptr;
>>> - do {
>>> - // Given the nature of the concurrent queue implementation it's
>>> - // possible that we'll need to try to dequeue multiple times
>>> - // until it returns an event even though we know there is an
>>> - // event because the semantics are that we shouldn't call
>>> - // `dequeue()` before calling `empty()`.
>>> - event = try_dequeue();
>>> - } while (event == nullptr);
>>> - return event;
>>> + return queue.dequeue();
>>> }
>>>
>>> bool empty()
>>> {
>>> - // NOTE: for performance reasons we don't check `comissioned` here
>>> - // so it's possible that we'll return true when in fact we've been
>>> - // decomissioned and you shouldn't attempt to dequeue anything.
>>> - return (sequence.load() - next) == 0;
>>> + return queue.empty();
>>> }
>>>
>>> void decomission()
>>> {
>>> comissioned.store(true);
>>> while (!empty()) {
>>> - // NOTE: we use `try_dequeue()` here because we might be racing
>>> - // with `enqueue()` where they've already incremented `sequence`
>>> - // so we think there are more items to dequeue but they aren't
>>> - // actually going to enqueue anything because they've since seen
>>> - // `comissioned` is true. We'll attempt to dequeue with
>>> - // `try_dequeue()` and eventually they'll decrement `sequence`
>>> - // and so `empty()` will return true and we'll bail.
>>> - Event* event = try_dequeue();
>>> - if (event != nullptr) {
>>> - delete event;
>>> - }
>>> + delete dequeue();
>>> }
>>> }
>>>
>>> template <typename T>
>>> size_t count()
>>> {
>>> - // Try and dequeue more elements first!
>>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>> - return std::count_if(
>>> - items.begin(),
>>> - items.end(),
>>> - [](const Item& item) {
>>> - if (item.event != nullptr) {
>>> - return item.event->is<T>();
>>> - }
>>> - return false;
>>> - });
>>> + size_t count = 0;
>>> + queue.for_each([&count](Event* event) {
>>> + if (event->is<T>()) {
>>> + count++;
>>> + }
>>> + });
>>> + return count;
>>> }
>>>
>>> operator JSON::Array()
>>> {
>>> - // Try and dequeue more elements first!
>>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>> JSON::Array array;
>>> - foreach (const Item& item, items) {
>>> - if (item.event != nullptr) {
>>> - array.values.push_back(JSON::Object(*item.event));
>>> - }
>>> - }
>>> + queue.for_each([&array](Event* event) {
>>> + array.values.push_back(JSON::Object(*event));
>>> + });
>>>
>>> return array;
>>> }
>>>
>>> - struct Item
>>> - {
>>> - uint64_t sequence;
>>> - Event* event;
>>> - };
>>> -
>>> - Event* try_dequeue()
>>> - {
>>> - // The general algoritm here is as follows: we bulk dequeue as
>>> - // many items from the concurrent queue as possible. We then look
>>> - // for the `next` item in the sequence hoping that it's at the
>>> - // beginning of `items` but because the `queue` is not
>>> - // linearizable it might be "out of order". If we find it out of
>>> - // order we effectively dequeue it but leave it in `items` so as
>>> - // not to incur any costly rearrangements/compactions in
>>> - // `items`. We'll later pop the out of order items once they get
>>> - // to the front.
>>> -
>>> - // Start by popping any items that we effectively dequeued but
>>> - // didn't remove from `items` so as not to incur costly
>>> - // rearragements/compactions.
>>> - while (!items.empty() && next > items.front().sequence) {
>>> - items.pop_front();
>>> - }
>>> -
>>> - // Optimistically let's hope that the next item is at the front of
>>> - // `item`. If so, pop the item, increment `next`, and return the
>>> - // event.
>>> - if (!items.empty() && items.front().sequence == next) {
>>> - Event* event = items.front().event;
>>> - items.pop_front();
>>> - next += 1;
>>> - return event;
>>> - }
>>> -
>>> - size_t index = 0;
>>> -
>>> - do {
>>> - // Now look for a potentially out of order item. If found,
>>> - // signifiy the item has been dequeued by nulling the event
>>> - // (necessary for the implementation of `count()` and `operator
>>> - // JSON::Array()`) and return the event.
>>> - for (; index < items.size(); index++) {
>>> - if (items[index].sequence == next) {
>>> - Event* event = items[index].event;
>>> - items[index].event = nullptr;
>>> - next += 1;
>>> - return event;
>>> - }
>>> - }
>>> -
>>> - // If we can bulk dequeue more items then keep looking for the
>>> - // out of order event!
>>> - //
>>> - // NOTE: we use the _small_ value of `4` to dequeue here since
>>> - // in the presence of enough events being enqueued we could end
>>> - // up spending a LONG time dequeuing here! Since the next event
>>> - // in the sequence should really be close to the top of the
>>> - // queue we use a small value to dequeue.
>>> - //
>>> - // The intuition here is this: the faster we can return the next
>>> - // event the faster that event can get processed and the faster
>>> - // it might generate other events that can get processed in
>>> - // parallel by other threads and the more work we get done.
>>> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>>> -
>>> - return nullptr;
>>> - }
>>> -
>>> // Underlying queue of items.
>>> - moodycamel::ConcurrentQueue<Item> queue;
>>> -
>>> - // Counter to represent the item sequence. Note that we use a
>>> - // unsigned 64-bit integer which means that even if we were adding
>>> - // one item to the queue every nanosecond we'd be able to run for
>>> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>>> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>>> -
>>> - // Counter to represent the next item we expect to dequeue. Note
>>> - // that we don't need to make this be atomic because only a single
>>> - // consumer is ever reading or writing this variable!
>>> - uint64_t next = 0;
>>> -
>>> - // Collection of bulk dequeued items that may be out of order. Note
>>> - // that like `next` this will only ever be read/written by a single
>>> - // consumer.
>>> - //
>>> - // The use of a deque was explicit because it is implemented as an
>>> - // array of arrays (or vector of vectors) which usually gives good
>>> - // performance for appending to the back and popping from the front
>>> - // which is exactly what we need to do. To avoid any performance
>>> - // issues that might be incurred we do not remove any items from the
>>> - // middle of the deque (see comments in `try_dequeue()` above for
>>> - // more details).
>>> - std::deque<Item> items;
>>> + MpscLinkedQueue<Event> queue;
>>>
>>> // Whether or not the event queue has been decomissioned. This must
>>> // be atomic as it can be read by a producer even though it's only
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> new file mode 100644
>>> index 0000000..48c9509
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> @@ -0,0 +1,179 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +// http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>>> +#define __MPSC_LINKED_QUEUE_HPP__
>>> +
>>> +#include <atomic>
>>> +#include <functional>
>>> +
>>> +#include <glog/logging.h>
>>> +
>>> +namespace process {
>>> +
>>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
>>> +// the core methods:
>>> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>>> +//
>>> +// which is a Java port of the MPSC algorithm as presented in following article:
>>> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>>> +//
>>> +// The queue has following properties:
>>> +// Producers are wait-free (one atomic exchange per enqueue)
>>> +// Consumer is
>>> +// - lock-free
>>> +// - mostly wait-free, except when consumer reaches the end of the queue
>>> +// and producer enqueued a new node, but did not update the next pointer
>>> +// on the old node, yet
>>> +template <typename T>
>>> +class MpscLinkedQueue
>>> +{
>>> +private:
>>> + template <typename E>
>>> + struct Node
>>> + {
>>> + public:
>>> + explicit Node(E* element = nullptr) : element(element) {}
>>> +
>>> + E* element;
>>> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>>> + };
>>> +
>>> +public:
>>> + MpscLinkedQueue()
>>> + {
>>> + tail = new Node<T>();
>>> + head.store(tail);
>>> + }
>>> +
>>> + ~MpscLinkedQueue()
>>> + {
>>> + while (auto element = dequeue()) {
>>> + delete element;
>>> + }
>>> +
>>> + delete tail;
>>> + }
>>> +
>>> + // Multi producer safe.
>>> + void enqueue(T* element)
>>> + {
>>> + // A `nullptr` is used to denote an empty queue when doing a
>>> + // `dequeue()` so producers can't use it as an element.
>>> + CHECK_NOTNULL(element);
>>> +
>>> + auto newNode = new Node<T>(element);
>>> +
>>> + // Exchange is guaranteed to only give the old value to one
>>> + // producer, so this is safe and wait-free.
>>> + auto oldhead = head.exchange(newNode, std::memory_order_release);
>>> +
>>> + // At this point if this thread context switches out we may block
>>> + // the consumer from doing a dequeue (see below). Eventually we'll
>>> + // unblock the consumer once we run again and execute the next
>>> + // line of code.
>>> + oldhead->next.store(newNode, std::memory_order_release);
>>> + }
>>> +
>>> + // Single consumer only.
>>> + T* dequeue()
>>> + {
>>> + auto currentTail = tail;
>>> +
>>> + // Check and see if there is an actual element linked from `tail`
>>> + // since we use `tail` as a "stub" rather than the actual element.
>>> + auto nextTail = currentTail->next.exchange(
>>> + nullptr,
>>> + std::memory_order_relaxed);
>>> +
>>> + // There are three possible cases here:
>>> + //
>>> + // (1) The queue is empty.
>>> + // (2) The queue appears empty but a producer is still enqueuing
>>> + // so let's wait for it and then dequeue.
>>> + // (3) We have something to dequeue.
>>> + //
>>> + // Start by checking if the queue is or appears empty.
>>> + if (nextTail == nullptr) {
>>> + // Now check if the queue is actually empty or just appears
>>> + // empty. If it's actually empty then return `nullptr` to denote
>>> + // emptiness.
>>> + if (head.load(std::memory_order_relaxed) == tail) {
>>> + return nullptr;
>>> + }
>>> +
>>> + // Another thread already inserted a new node, but did not
>>> + // connect it to the tail, yet, so we spin-wait. At this point
>>> + // we are not wait-free anymore.
>>> + do {
>>> + nextTail = currentTail->next.exchange(
>>> + nullptr,
>>> + std::memory_order_relaxed);
>>> + } while (nextTail == nullptr);
>>> + }
>>> +
>>> + CHECK_NOTNULL(nextTail);
>>> +
>>> + auto element = nextTail->element;
>>> + nextTail->element = nullptr;
>>> +
>>> + tail = nextTail;
>>> + delete currentTail;
>>> +
>>> + return element;
>>> + }
>>> +
>>> + // Single consumer only.
>>> + //
>>> + // TODO(drexin): Provide C++ style iteration so someone can just use
>>> + // the `std::for_each()`.
>>> + template <typename F>
>>> + void for_each(F&& f)
>>> + {
>>> + auto end = head.load();
>>> + auto node = tail;
>>> +
>>> + for (;;) {
>>> + node = node->next.load();
>>> +
>>> + // We are following the linked structure until we reach the end
>>> + // node. There is a race with new nodes being added, so we limit
>>> + // the traversal to the last node at the time we started.
>>> + if (node == nullptr) {
>>> + return;
>>> + }
>>> +
>>> + f(node->element);
>>> +
>>> + if (node == end) {
>>> + return;
>>> + }
>>> + }
>>> + }
>>> +
>>> + // Single consumer only.
>>> + bool empty()
>>> + {
>>> + return tail->next.load(std::memory_order_relaxed) == nullptr &&
>>> + head.load(std::memory_order_relaxed) == tail;
>>> + }
>>> +
>>> +private:
>>> + std::atomic<Node<T>*> head;
>>> +
>>> + // TODO(drexin): Programatically get the cache line size.
>>> + alignas(128) Node<T>* tail;
>>> +};
>>> +
>>> +} // namespace process {
>>> +
>>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> index 25a34f9..5814bc6 100644
>>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>>> limiter_tests.cpp
>>> loop_tests.cpp
>>> metrics_tests.cpp
>>> + mpsc_linked_queue_tests.cpp
>>> mutex_tests.cpp
>>> owned_tests.cpp
>>> process_tests.cpp
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> index 2ec0d42..e8ef21f 100644
>>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> @@ -22,6 +22,7 @@
>>> #include <iostream>
>>> #include <memory>
>>> #include <string>
>>> +#include <thread>
>>> #include <vector>
>>>
>>> #include <process/collect.hpp>
>>> @@ -40,6 +41,8 @@
>>>
>>> #include "benchmarks.pb.h"
>>>
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> namespace http = process::http;
>>>
>>> using process::CountDownLatch;
>>> @@ -567,7 +570,6 @@ private:
>>> long count = 0;
>>> };
>>>
>>> -
>>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>>> {
>>> constexpr long repeats = 100000;
>>> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
>>> process.run(num_submessages);
>>> }
>>> }
>>> +
>>> +
>>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>>> +{
>>> + // NOTE: we set the total number of producers to be 1 less than the
>>> + // hardware concurrency so the consumer doesn't have to fight for
>>> + // processing time with the producers.
>>> + const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
>>> + const int messageCount = 10000000;
>>> + const int totalCount = messageCount * producerCount;
>>> + std::string* s = new std::string("");
>>> + process::MpscLinkedQueue<std::string> q;
>>> +
>>> + Stopwatch consumerWatch;
>>> +
>>> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>>> + consumerWatch.start();
>>> + for (int i = totalCount; i > 0;) {
>>> + if (q.dequeue() != nullptr) {
>>> + i--;
>>> + }
>>> + }
>>> + consumerWatch.stop();
>>> + });
>>> +
>>> + std::vector<std::thread> producers;
>>> +
>>> + Stopwatch producerWatch;
>>> + producerWatch.start();
>>> +
>>> + for (unsigned int t = 0; t < producerCount; t++) {
>>> + producers.push_back(std::thread([messageCount, s, &q]() {
>>> + for (int i = 0; i < messageCount; i++) {
>>> + q.enqueue(s);
>>> + }
>>> + }));
>>> + }
>>> +
>>> + for (std::thread& producer : producers) {
>>> + producer.join();
>>> + }
>>> +
>>> + producerWatch.stop();
>>> +
>>> + consumer.join();
>>> +
>>> + Duration producerElapsed = producerWatch.elapsed();
>>> + Duration consumerElapsed = consumerWatch.elapsed();
>>> +
>>> + double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>>> + double producerThroughput = (double) totalCount / producerElapsed.secs();
>>> + double throughput = consumerThroughput + producerThroughput;
>>> +
>>> + cout << "Estimated producer throughput (" << producerCount << " threads): "
>>> + << std::fixed << producerThroughput << " op/s" << endl;
>>> + cout << "Estimated consumer throughput: "
>>> + << std::fixed << consumerThroughput << " op/s" << endl;
>>> + cout << "Estimated total throughput: "
>>> + << std::fixed << throughput << " op/s" << endl;
>>> +}
>>>
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> new file mode 100644
>>> index 0000000..7699974
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> @@ -0,0 +1,104 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +// http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#include <thread>
>>> +
>>> +#include <stout/gtest.hpp>
>>> +#include <stout/stringify.hpp>
>>> +
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>>> +{
>>> + process::MpscLinkedQueue<std::string> q;
>>> + std::string* s = new std::string("test");
>>> + q.enqueue(s);
>>> + std::string* s2 = q.dequeue();
>>> + ASSERT_EQ(s, s2);
>>> + delete s2;
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>>> +{
>>> + process::MpscLinkedQueue<std::string> q;
>>> + for (int i = 0; i < 20; i++) {
>>> + q.enqueue(new std::string(stringify(i)));
>>> + }
>>> +
>>> + for (int i = 0; i < 20; i++) {
>>> + std::string* s = q.dequeue();
>>> + ASSERT_EQ(*s, stringify(i));
>>> + delete s;
>>> + }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>>> +{
>>> + process::MpscLinkedQueue<std::string> q;
>>> + std::vector<std::thread> threads;
>>> + for (int t = 0; t < 5; t++) {
>>> + threads.push_back(
>>> + std::thread([t, &q]() {
>>> + int start = t * 1000;
>>> + int end = start + 1000;
>>> + for (int i = start; i < end; i++) {
>>> + q.enqueue(new std::string(stringify(i)));
>>> + }
>>> + }));
>>> + }
>>> +
>>> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>>> + t.join();
>>> + });
>>> +
>>> + std::set<std::string> elements;
>>> +
>>> + std::string* s = nullptr;
>>> + while ((s = q.dequeue()) != nullptr) {
>>> + elements.insert(*s);
>>> + }
>>> +
>>> + ASSERT_EQ(5000UL, elements.size());
>>> +
>>> + for (int i = 0; i < 5000; i++) {
>>> + ASSERT_NE(elements.end(), elements.find(stringify(i)));
>>> + }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, ForEach)
>>> +{
>>> + process::MpscLinkedQueue<std::string> q;
>>> + for (int i = 0; i < 20; i++) {
>>> + q.enqueue(new std::string(stringify(i)));
>>> + }
>>> + int i = 0;
>>> + q.for_each([&](std::string* s) {
>>> + ASSERT_EQ(*s, stringify(i++));
>>> + });
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, Empty)
>>> +{
>>> + process::MpscLinkedQueue<std::string> q;
>>> + ASSERT_TRUE(q.empty());
>>> + std::string* s = new std::string("test");
>>> + q.enqueue(s);
>>> + ASSERT_FALSE(q.empty());
>>> + q.dequeue();
>>> + ASSERT_TRUE(q.empty());
>>> + delete s;
>>> +}
>>>
>>
>
Re: mesos git commit: Added mpsc_linked_queue and use it as the
concurrent event queue.
Posted by Dario Rexin <dr...@apple.com>.
Hi Benjamin,
see comments inline.
> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier <be...@mesosphere.io> wrote:
>
> Hi Dario,
>
> this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
>
>
> I already created a patch for the unused lambda capture,
>
> https://reviews.apache.org/r/67927/
>
> While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.
>
Yes, I was a bit confused by the MSVC error there. I added the explicit capture because I thought it would be preferable over implicit capture, but I’m fine with either. Thanks for creating the patch!
>
> I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.
>
> private:
> std::atomic<Node<T>*> head;
>
> char padding[128 - sizeof(std::atomic<Node<T>*>)];
>
> // TODO(drexin): Programatically get the cache line size.
> alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.
>
> Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.
>
That’s interesting. The padding after tail is a good point, we should definitely add that to prevent false sharing. If we add the padding, is alignas still necessary?
Thanks,
Dario
>
> Cheers,
>
> Benjamin
>
>
>> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
>>
>> Repository: mesos
>> Updated Branches:
>> refs/heads/master a11a6a3d8 -> b1eafc035
>>
>>
>> Added mpsc_linked_queue and use it as the concurrent event queue.
>>
>> https://reviews.apache.org/r/62515
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>>
>> Branch: refs/heads/master
>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>> Parents: a11a6a3
>> Author: Dario Rexin <dr...@apple.com>
>> Authored: Sat Jul 7 13:20:22 2018 -0700
>> Committer: Benjamin Hindman <be...@gmail.com>
>> Committed: Sun Jul 15 09:55:28 2018 -0700
>>
>> ----------------------------------------------------------------------
>> 3rdparty/libprocess/Makefile.am | 1 +
>> 3rdparty/libprocess/src/event_queue.hpp | 168 ++---------------
>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++
>> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
>> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++-
>> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++
>> 6 files changed, 367 insertions(+), 150 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
>> index 2d356aa..631491a 100644
>> --- a/3rdparty/libprocess/Makefile.am
>> +++ b/3rdparty/libprocess/Makefile.am
>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = \
>> src/tests/loop_tests.cpp \
>> src/tests/main.cpp \
>> src/tests/metrics_tests.cpp \
>> + src/tests/mpsc_linked_queue_tests.cpp \
>> src/tests/mutex_tests.cpp \
>> src/tests/owned_tests.cpp \
>> src/tests/process_tests.cpp \
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
>> index 21c522d..999d552 100644
>> --- a/3rdparty/libprocess/src/event_queue.hpp
>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>> @@ -17,10 +17,6 @@
>> #include <mutex>
>> #include <string>
>>
>> -#ifdef LOCK_FREE_EVENT_QUEUE
>> -#include <concurrentqueue.h>
>> -#endif // LOCK_FREE_EVENT_QUEUE
>> -
>> #include <process/event.hpp>
>> #include <process/http.hpp>
>>
>> @@ -28,6 +24,10 @@
>> #include <stout/stringify.hpp>
>> #include <stout/synchronized.hpp>
>>
>> +#ifdef LOCK_FREE_EVENT_QUEUE
>> +#include "mpsc_linked_queue.hpp"
>> +#endif // LOCK_FREE_EVENT_QUEUE
>> +
>> namespace process {
>>
>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>> @@ -187,185 +187,55 @@ private:
>> #else // LOCK_FREE_EVENT_QUEUE
>> void enqueue(Event* event)
>> {
>> - Item item = {sequence.fetch_add(1), event};
>> if (comissioned.load()) {
>> - queue.enqueue(std::move(item));
>> + queue.enqueue(event);
>> } else {
>> - sequence.fetch_sub(1);
>> delete event;
>> }
>> }
>>
>> Event* dequeue()
>> {
>> - // NOTE: for performance reasons we don't check `comissioned` here
>> - // so it's possible that we'll loop forever if a consumer called
>> - // `decomission()` and then subsequently called `dequeue()`.
>> - Event* event = nullptr;
>> - do {
>> - // Given the nature of the concurrent queue implementation it's
>> - // possible that we'll need to try to dequeue multiple times
>> - // until it returns an event even though we know there is an
>> - // event because the semantics are that we shouldn't call
>> - // `dequeue()` before calling `empty()`.
>> - event = try_dequeue();
>> - } while (event == nullptr);
>> - return event;
>> + return queue.dequeue();
>> }
>>
>> bool empty()
>> {
>> - // NOTE: for performance reasons we don't check `comissioned` here
>> - // so it's possible that we'll return true when in fact we've been
>> - // decomissioned and you shouldn't attempt to dequeue anything.
>> - return (sequence.load() - next) == 0;
>> + return queue.empty();
>> }
>>
>> void decomission()
>> {
>> comissioned.store(true);
>> while (!empty()) {
>> - // NOTE: we use `try_dequeue()` here because we might be racing
>> - // with `enqueue()` where they've already incremented `sequence`
>> - // so we think there are more items to dequeue but they aren't
>> - // actually going to enqueue anything because they've since seen
>> - // `comissioned` is true. We'll attempt to dequeue with
>> - // `try_dequeue()` and eventually they'll decrement `sequence`
>> - // and so `empty()` will return true and we'll bail.
>> - Event* event = try_dequeue();
>> - if (event != nullptr) {
>> - delete event;
>> - }
>> + delete dequeue();
>> }
>> }
>>
>> template <typename T>
>> size_t count()
>> {
>> - // Try and dequeue more elements first!
>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>> - return std::count_if(
>> - items.begin(),
>> - items.end(),
>> - [](const Item& item) {
>> - if (item.event != nullptr) {
>> - return item.event->is<T>();
>> - }
>> - return false;
>> - });
>> + size_t count = 0;
>> + queue.for_each([&count](Event* event) {
>> + if (event->is<T>()) {
>> + count++;
>> + }
>> + });
>> + return count;
>> }
>>
>> operator JSON::Array()
>> {
>> - // Try and dequeue more elements first!
>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>> JSON::Array array;
>> - foreach (const Item& item, items) {
>> - if (item.event != nullptr) {
>> - array.values.push_back(JSON::Object(*item.event));
>> - }
>> - }
>> + queue.for_each([&array](Event* event) {
>> + array.values.push_back(JSON::Object(*event));
>> + });
>>
>> return array;
>> }
>>
>> - struct Item
>> - {
>> - uint64_t sequence;
>> - Event* event;
>> - };
>> -
>> - Event* try_dequeue()
>> - {
>> - // The general algoritm here is as follows: we bulk dequeue as
>> - // many items from the concurrent queue as possible. We then look
>> - // for the `next` item in the sequence hoping that it's at the
>> - // beginning of `items` but because the `queue` is not
>> - // linearizable it might be "out of order". If we find it out of
>> - // order we effectively dequeue it but leave it in `items` so as
>> - // not to incur any costly rearrangements/compactions in
>> - // `items`. We'll later pop the out of order items once they get
>> - // to the front.
>> -
>> - // Start by popping any items that we effectively dequeued but
>> - // didn't remove from `items` so as not to incur costly
>> - // rearragements/compactions.
>> - while (!items.empty() && next > items.front().sequence) {
>> - items.pop_front();
>> - }
>> -
>> - // Optimistically let's hope that the next item is at the front of
>> - // `item`. If so, pop the item, increment `next`, and return the
>> - // event.
>> - if (!items.empty() && items.front().sequence == next) {
>> - Event* event = items.front().event;
>> - items.pop_front();
>> - next += 1;
>> - return event;
>> - }
>> -
>> - size_t index = 0;
>> -
>> - do {
>> - // Now look for a potentially out of order item. If found,
>> - // signifiy the item has been dequeued by nulling the event
>> - // (necessary for the implementation of `count()` and `operator
>> - // JSON::Array()`) and return the event.
>> - for (; index < items.size(); index++) {
>> - if (items[index].sequence == next) {
>> - Event* event = items[index].event;
>> - items[index].event = nullptr;
>> - next += 1;
>> - return event;
>> - }
>> - }
>> -
>> - // If we can bulk dequeue more items then keep looking for the
>> - // out of order event!
>> - //
>> - // NOTE: we use the _small_ value of `4` to dequeue here since
>> - // in the presence of enough events being enqueued we could end
>> - // up spending a LONG time dequeuing here! Since the next event
>> - // in the sequence should really be close to the top of the
>> - // queue we use a small value to dequeue.
>> - //
>> - // The intuition here is this: the faster we can return the next
>> - // event the faster that event can get processed and the faster
>> - // it might generate other events that can get processed in
>> - // parallel by other threads and the more work we get done.
>> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>> -
>> - return nullptr;
>> - }
>> -
>> // Underlying queue of items.
>> - moodycamel::ConcurrentQueue<Item> queue;
>> -
>> - // Counter to represent the item sequence. Note that we use a
>> - // unsigned 64-bit integer which means that even if we were adding
>> - // one item to the queue every nanosecond we'd be able to run for
>> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>> -
>> - // Counter to represent the next item we expect to dequeue. Note
>> - // that we don't need to make this be atomic because only a single
>> - // consumer is ever reading or writing this variable!
>> - uint64_t next = 0;
>> -
>> - // Collection of bulk dequeued items that may be out of order. Note
>> - // that like `next` this will only ever be read/written by a single
>> - // consumer.
>> - //
>> - // The use of a deque was explicit because it is implemented as an
>> - // array of arrays (or vector of vectors) which usually gives good
>> - // performance for appending to the back and popping from the front
>> - // which is exactly what we need to do. To avoid any performance
>> - // issues that might be incurred we do not remove any items from the
>> - // middle of the deque (see comments in `try_dequeue()` above for
>> - // more details).
>> - std::deque<Item> items;
>> + MpscLinkedQueue<Event> queue;
>>
>> // Whether or not the event queue has been decomissioned. This must
>> // be atomic as it can be read by a producer even though it's only
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> new file mode 100644
>> index 0000000..48c9509
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> @@ -0,0 +1,179 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +// http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>> +#define __MPSC_LINKED_QUEUE_HPP__
>> +
>> +#include <atomic>
>> +#include <functional>
>> +
>> +#include <glog/logging.h>
>> +
>> +namespace process {
>> +
>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
>> +// the core methods:
>> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>> +//
>> +// which is a Java port of the MPSC algorithm as presented in following article:
>> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>> +//
>> +// The queue has following properties:
>> +// Producers are wait-free (one atomic exchange per enqueue)
>> +// Consumer is
>> +// - lock-free
>> +// - mostly wait-free, except when consumer reaches the end of the queue
>> +// and producer enqueued a new node, but did not update the next pointer
>> +// on the old node, yet
>> +template <typename T>
>> +class MpscLinkedQueue
>> +{
>> +private:
>> + template <typename E>
>> + struct Node
>> + {
>> + public:
>> + explicit Node(E* element = nullptr) : element(element) {}
>> +
>> + E* element;
>> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>> + };
>> +
>> +public:
>> + MpscLinkedQueue()
>> + {
>> + tail = new Node<T>();
>> + head.store(tail);
>> + }
>> +
>> + ~MpscLinkedQueue()
>> + {
>> + while (auto element = dequeue()) {
>> + delete element;
>> + }
>> +
>> + delete tail;
>> + }
>> +
>> + // Multi producer safe.
>> + void enqueue(T* element)
>> + {
>> + // A `nullptr` is used to denote an empty queue when doing a
>> + // `dequeue()` so producers can't use it as an element.
>> + CHECK_NOTNULL(element);
>> +
>> + auto newNode = new Node<T>(element);
>> +
>> + // Exchange is guaranteed to only give the old value to one
>> + // producer, so this is safe and wait-free.
>> + auto oldhead = head.exchange(newNode, std::memory_order_release);
>> +
>> + // At this point if this thread context switches out we may block
>> + // the consumer from doing a dequeue (see below). Eventually we'll
>> + // unblock the consumer once we run again and execute the next
>> + // line of code.
>> + oldhead->next.store(newNode, std::memory_order_release);
>> + }
>> +
>> + // Single consumer only.
>> + T* dequeue()
>> + {
>> + auto currentTail = tail;
>> +
>> + // Check and see if there is an actual element linked from `tail`
>> + // since we use `tail` as a "stub" rather than the actual element.
>> + auto nextTail = currentTail->next.exchange(
>> + nullptr,
>> + std::memory_order_relaxed);
>> +
>> + // There are three possible cases here:
>> + //
>> + // (1) The queue is empty.
>> + // (2) The queue appears empty but a producer is still enqueuing
>> + // so let's wait for it and then dequeue.
>> + // (3) We have something to dequeue.
>> + //
>> + // Start by checking if the queue is or appears empty.
>> + if (nextTail == nullptr) {
>> + // Now check if the queue is actually empty or just appears
>> + // empty. If it's actually empty then return `nullptr` to denote
>> + // emptiness.
>> + if (head.load(std::memory_order_relaxed) == tail) {
>> + return nullptr;
>> + }
>> +
>> + // Another thread already inserted a new node, but did not
>> + // connect it to the tail, yet, so we spin-wait. At this point
>> + // we are not wait-free anymore.
>> + do {
>> + nextTail = currentTail->next.exchange(
>> + nullptr,
>> + std::memory_order_relaxed);
>> + } while (nextTail == nullptr);
>> + }
>> +
>> + CHECK_NOTNULL(nextTail);
>> +
>> + auto element = nextTail->element;
>> + nextTail->element = nullptr;
>> +
>> + tail = nextTail;
>> + delete currentTail;
>> +
>> + return element;
>> + }
>> +
>> + // Single consumer only.
>> + //
>> + // TODO(drexin): Provide C++ style iteration so someone can just use
>> + // the `std::for_each()`.
>> + template <typename F>
>> + void for_each(F&& f)
>> + {
>> + auto end = head.load();
>> + auto node = tail;
>> +
>> + for (;;) {
>> + node = node->next.load();
>> +
>> + // We are following the linked structure until we reach the end
>> + // node. There is a race with new nodes being added, so we limit
>> + // the traversal to the last node at the time we started.
>> + if (node == nullptr) {
>> + return;
>> + }
>> +
>> + f(node->element);
>> +
>> + if (node == end) {
>> + return;
>> + }
>> + }
>> + }
>> +
>> + // Single consumer only.
>> + bool empty()
>> + {
>> + return tail->next.load(std::memory_order_relaxed) == nullptr &&
>> + head.load(std::memory_order_relaxed) == tail;
>> + }
>> +
>> +private:
>> + std::atomic<Node<T>*> head;
>> +
>> + // TODO(drexin): Programatically get the cache line size.
>> + alignas(128) Node<T>* tail;
>> +};
>> +
>> +} // namespace process {
>> +
>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> index 25a34f9..5814bc6 100644
>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>> limiter_tests.cpp
>> loop_tests.cpp
>> metrics_tests.cpp
>> + mpsc_linked_queue_tests.cpp
>> mutex_tests.cpp
>> owned_tests.cpp
>> process_tests.cpp
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> index 2ec0d42..e8ef21f 100644
>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> @@ -22,6 +22,7 @@
>> #include <iostream>
>> #include <memory>
>> #include <string>
>> +#include <thread>
>> #include <vector>
>>
>> #include <process/collect.hpp>
>> @@ -40,6 +41,8 @@
>>
>> #include "benchmarks.pb.h"
>>
>> +#include "mpsc_linked_queue.hpp"
>> +
>> namespace http = process::http;
>>
>> using process::CountDownLatch;
>> @@ -567,7 +570,6 @@ private:
>> long count = 0;
>> };
>>
>> -
>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>> {
>> constexpr long repeats = 100000;
>> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
>> process.run(num_submessages);
>> }
>> }
>> +
>> +
>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>> +{
>> + // NOTE: we set the total number of producers to be 1 less than the
>> + // hardware concurrency so the consumer doesn't have to fight for
>> + // processing time with the producers.
>> + const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
>> + const int messageCount = 10000000;
>> + const int totalCount = messageCount * producerCount;
>> + std::string* s = new std::string("");
>> + process::MpscLinkedQueue<std::string> q;
>> +
>> + Stopwatch consumerWatch;
>> +
>> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>> + consumerWatch.start();
>> + for (int i = totalCount; i > 0;) {
>> + if (q.dequeue() != nullptr) {
>> + i--;
>> + }
>> + }
>> + consumerWatch.stop();
>> + });
>> +
>> + std::vector<std::thread> producers;
>> +
>> + Stopwatch producerWatch;
>> + producerWatch.start();
>> +
>> + for (unsigned int t = 0; t < producerCount; t++) {
>> + producers.push_back(std::thread([messageCount, s, &q]() {
>> + for (int i = 0; i < messageCount; i++) {
>> + q.enqueue(s);
>> + }
>> + }));
>> + }
>> +
>> + for (std::thread& producer : producers) {
>> + producer.join();
>> + }
>> +
>> + producerWatch.stop();
>> +
>> + consumer.join();
>> +
>> + Duration producerElapsed = producerWatch.elapsed();
>> + Duration consumerElapsed = consumerWatch.elapsed();
>> +
>> + double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>> + double producerThroughput = (double) totalCount / producerElapsed.secs();
>> + double throughput = consumerThroughput + producerThroughput;
>> +
>> + cout << "Estimated producer throughput (" << producerCount << " threads): "
>> + << std::fixed << producerThroughput << " op/s" << endl;
>> + cout << "Estimated consumer throughput: "
>> + << std::fixed << consumerThroughput << " op/s" << endl;
>> + cout << "Estimated total throughput: "
>> + << std::fixed << throughput << " op/s" << endl;
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> new file mode 100644
>> index 0000000..7699974
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> @@ -0,0 +1,104 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +// http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#include <thread>
>> +
>> +#include <stout/gtest.hpp>
>> +#include <stout/stringify.hpp>
>> +
>> +#include "mpsc_linked_queue.hpp"
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>> +{
>> + process::MpscLinkedQueue<std::string> q;
>> + std::string* s = new std::string("test");
>> + q.enqueue(s);
>> + std::string* s2 = q.dequeue();
>> + ASSERT_EQ(s, s2);
>> + delete s2;
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>> +{
>> + process::MpscLinkedQueue<std::string> q;
>> + for (int i = 0; i < 20; i++) {
>> + q.enqueue(new std::string(stringify(i)));
>> + }
>> +
>> + for (int i = 0; i < 20; i++) {
>> + std::string* s = q.dequeue();
>> + ASSERT_EQ(*s, stringify(i));
>> + delete s;
>> + }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>> +{
>> + process::MpscLinkedQueue<std::string> q;
>> + std::vector<std::thread> threads;
>> + for (int t = 0; t < 5; t++) {
>> + threads.push_back(
>> + std::thread([t, &q]() {
>> + int start = t * 1000;
>> + int end = start + 1000;
>> + for (int i = start; i < end; i++) {
>> + q.enqueue(new std::string(stringify(i)));
>> + }
>> + }));
>> + }
>> +
>> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>> + t.join();
>> + });
>> +
>> + std::set<std::string> elements;
>> +
>> + std::string* s = nullptr;
>> + while ((s = q.dequeue()) != nullptr) {
>> + elements.insert(*s);
>> + }
>> +
>> + ASSERT_EQ(5000UL, elements.size());
>> +
>> + for (int i = 0; i < 5000; i++) {
>> + ASSERT_NE(elements.end(), elements.find(stringify(i)));
>> + }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, ForEach)
>> +{
>> + process::MpscLinkedQueue<std::string> q;
>> + for (int i = 0; i < 20; i++) {
>> + q.enqueue(new std::string(stringify(i)));
>> + }
>> + int i = 0;
>> + q.for_each([&](std::string* s) {
>> + ASSERT_EQ(*s, stringify(i++));
>> + });
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, Empty)
>> +{
>> + process::MpscLinkedQueue<std::string> q;
>> + ASSERT_TRUE(q.empty());
>> + std::string* s = new std::string("test");
>> + q.enqueue(s);
>> + ASSERT_FALSE(q.empty());
>> + q.dequeue();
>> + ASSERT_TRUE(q.empty());
>> + delete s;
>> +}
>>
>
Re: mesos git commit: Added mpsc_linked_queue and use it as the
concurrent event queue.
Posted by Benjamin Bannier <be...@mesosphere.io>.
Hi Dario,
this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
I already created a patch for the unused lambda capture,
https://reviews.apache.org/r/67927/
While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.
I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.
private:
std::atomic<Node<T>*> head;
char padding[128 - sizeof(std::atomic<Node<T>*>)];
// TODO(drexin): Programatically get the cache line size.
alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.
Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.
Cheers,
Benjamin
> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
>
> Repository: mesos
> Updated Branches:
> refs/heads/master a11a6a3d8 -> b1eafc035
>
>
> Added mpsc_linked_queue and use it as the concurrent event queue.
>
> https://reviews.apache.org/r/62515
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>
> Branch: refs/heads/master
> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
> Parents: a11a6a3
> Author: Dario Rexin <dr...@apple.com>
> Authored: Sat Jul 7 13:20:22 2018 -0700
> Committer: Benjamin Hindman <be...@gmail.com>
> Committed: Sun Jul 15 09:55:28 2018 -0700
>
> ----------------------------------------------------------------------
> 3rdparty/libprocess/Makefile.am | 1 +
> 3rdparty/libprocess/src/event_queue.hpp | 168 ++---------------
> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++
> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++-
> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++
> 6 files changed, 367 insertions(+), 150 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
> index 2d356aa..631491a 100644
> --- a/3rdparty/libprocess/Makefile.am
> +++ b/3rdparty/libprocess/Makefile.am
> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = \
> src/tests/loop_tests.cpp \
> src/tests/main.cpp \
> src/tests/metrics_tests.cpp \
> + src/tests/mpsc_linked_queue_tests.cpp \
> src/tests/mutex_tests.cpp \
> src/tests/owned_tests.cpp \
> src/tests/process_tests.cpp \
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
> index 21c522d..999d552 100644
> --- a/3rdparty/libprocess/src/event_queue.hpp
> +++ b/3rdparty/libprocess/src/event_queue.hpp
> @@ -17,10 +17,6 @@
> #include <mutex>
> #include <string>
>
> -#ifdef LOCK_FREE_EVENT_QUEUE
> -#include <concurrentqueue.h>
> -#endif // LOCK_FREE_EVENT_QUEUE
> -
> #include <process/event.hpp>
> #include <process/http.hpp>
>
> @@ -28,6 +24,10 @@
> #include <stout/stringify.hpp>
> #include <stout/synchronized.hpp>
>
> +#ifdef LOCK_FREE_EVENT_QUEUE
> +#include "mpsc_linked_queue.hpp"
> +#endif // LOCK_FREE_EVENT_QUEUE
> +
> namespace process {
>
> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
> @@ -187,185 +187,55 @@ private:
> #else // LOCK_FREE_EVENT_QUEUE
> void enqueue(Event* event)
> {
> - Item item = {sequence.fetch_add(1), event};
> if (comissioned.load()) {
> - queue.enqueue(std::move(item));
> + queue.enqueue(event);
> } else {
> - sequence.fetch_sub(1);
> delete event;
> }
> }
>
> Event* dequeue()
> {
> - // NOTE: for performance reasons we don't check `comissioned` here
> - // so it's possible that we'll loop forever if a consumer called
> - // `decomission()` and then subsequently called `dequeue()`.
> - Event* event = nullptr;
> - do {
> - // Given the nature of the concurrent queue implementation it's
> - // possible that we'll need to try to dequeue multiple times
> - // until it returns an event even though we know there is an
> - // event because the semantics are that we shouldn't call
> - // `dequeue()` before calling `empty()`.
> - event = try_dequeue();
> - } while (event == nullptr);
> - return event;
> + return queue.dequeue();
> }
>
> bool empty()
> {
> - // NOTE: for performance reasons we don't check `comissioned` here
> - // so it's possible that we'll return true when in fact we've been
> - // decomissioned and you shouldn't attempt to dequeue anything.
> - return (sequence.load() - next) == 0;
> + return queue.empty();
> }
>
> void decomission()
> {
> comissioned.store(true);
> while (!empty()) {
> - // NOTE: we use `try_dequeue()` here because we might be racing
> - // with `enqueue()` where they've already incremented `sequence`
> - // so we think there are more items to dequeue but they aren't
> - // actually going to enqueue anything because they've since seen
> - // `comissioned` is true. We'll attempt to dequeue with
> - // `try_dequeue()` and eventually they'll decrement `sequence`
> - // and so `empty()` will return true and we'll bail.
> - Event* event = try_dequeue();
> - if (event != nullptr) {
> - delete event;
> - }
> + delete dequeue();
> }
> }
>
> template <typename T>
> size_t count()
> {
> - // Try and dequeue more elements first!
> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> - return std::count_if(
> - items.begin(),
> - items.end(),
> - [](const Item& item) {
> - if (item.event != nullptr) {
> - return item.event->is<T>();
> - }
> - return false;
> - });
> + size_t count = 0;
> + queue.for_each([&count](Event* event) {
> + if (event->is<T>()) {
> + count++;
> + }
> + });
> + return count;
> }
>
> operator JSON::Array()
> {
> - // Try and dequeue more elements first!
> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> JSON::Array array;
> - foreach (const Item& item, items) {
> - if (item.event != nullptr) {
> - array.values.push_back(JSON::Object(*item.event));
> - }
> - }
> + queue.for_each([&array](Event* event) {
> + array.values.push_back(JSON::Object(*event));
> + });
>
> return array;
> }
>
> - struct Item
> - {
> - uint64_t sequence;
> - Event* event;
> - };
> -
> - Event* try_dequeue()
> - {
> - // The general algoritm here is as follows: we bulk dequeue as
> - // many items from the concurrent queue as possible. We then look
> - // for the `next` item in the sequence hoping that it's at the
> - // beginning of `items` but because the `queue` is not
> - // linearizable it might be "out of order". If we find it out of
> - // order we effectively dequeue it but leave it in `items` so as
> - // not to incur any costly rearrangements/compactions in
> - // `items`. We'll later pop the out of order items once they get
> - // to the front.
> -
> - // Start by popping any items that we effectively dequeued but
> - // didn't remove from `items` so as not to incur costly
> - // rearragements/compactions.
> - while (!items.empty() && next > items.front().sequence) {
> - items.pop_front();
> - }
> -
> - // Optimistically let's hope that the next item is at the front of
> - // `item`. If so, pop the item, increment `next`, and return the
> - // event.
> - if (!items.empty() && items.front().sequence == next) {
> - Event* event = items.front().event;
> - items.pop_front();
> - next += 1;
> - return event;
> - }
> -
> - size_t index = 0;
> -
> - do {
> - // Now look for a potentially out of order item. If found,
> - // signifiy the item has been dequeued by nulling the event
> - // (necessary for the implementation of `count()` and `operator
> - // JSON::Array()`) and return the event.
> - for (; index < items.size(); index++) {
> - if (items[index].sequence == next) {
> - Event* event = items[index].event;
> - items[index].event = nullptr;
> - next += 1;
> - return event;
> - }
> - }
> -
> - // If we can bulk dequeue more items then keep looking for the
> - // out of order event!
> - //
> - // NOTE: we use the _small_ value of `4` to dequeue here since
> - // in the presence of enough events being enqueued we could end
> - // up spending a LONG time dequeuing here! Since the next event
> - // in the sequence should really be close to the top of the
> - // queue we use a small value to dequeue.
> - //
> - // The intuition here is this: the faster we can return the next
> - // event the faster that event can get processed and the faster
> - // it might generate other events that can get processed in
> - // parallel by other threads and the more work we get done.
> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
> -
> - return nullptr;
> - }
> -
> // Underlying queue of items.
> - moodycamel::ConcurrentQueue<Item> queue;
> -
> - // Counter to represent the item sequence. Note that we use a
> - // unsigned 64-bit integer which means that even if we were adding
> - // one item to the queue every nanosecond we'd be able to run for
> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
> -
> - // Counter to represent the next item we expect to dequeue. Note
> - // that we don't need to make this be atomic because only a single
> - // consumer is ever reading or writing this variable!
> - uint64_t next = 0;
> -
> - // Collection of bulk dequeued items that may be out of order. Note
> - // that like `next` this will only ever be read/written by a single
> - // consumer.
> - //
> - // The use of a deque was explicit because it is implemented as an
> - // array of arrays (or vector of vectors) which usually gives good
> - // performance for appending to the back and popping from the front
> - // which is exactly what we need to do. To avoid any performance
> - // issues that might be incurred we do not remove any items from the
> - // middle of the deque (see comments in `try_dequeue()` above for
> - // more details).
> - std::deque<Item> items;
> + MpscLinkedQueue<Event> queue;
>
> // Whether or not the event queue has been decomissioned. This must
> // be atomic as it can be read by a producer even though it's only
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> new file mode 100644
> index 0000000..48c9509
> --- /dev/null
> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> @@ -0,0 +1,179 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +// http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#ifndef __MPSC_LINKED_QUEUE_HPP__
> +#define __MPSC_LINKED_QUEUE_HPP__
> +
> +#include <atomic>
> +#include <functional>
> +
> +#include <glog/logging.h>
> +
> +namespace process {
> +
> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
> +// the core methods:
> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
> +//
> +// which is a Java port of the MPSC algorithm as presented in following article:
> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
> +//
> +// The queue has following properties:
> +// Producers are wait-free (one atomic exchange per enqueue)
> +// Consumer is
> +// - lock-free
> +// - mostly wait-free, except when consumer reaches the end of the queue
> +// and producer enqueued a new node, but did not update the next pointer
> +// on the old node, yet
> +template <typename T>
> +class MpscLinkedQueue
> +{
> +private:
> + template <typename E>
> + struct Node
> + {
> + public:
> + explicit Node(E* element = nullptr) : element(element) {}
> +
> + E* element;
> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
> + };
> +
> +public:
> + MpscLinkedQueue()
> + {
> + tail = new Node<T>();
> + head.store(tail);
> + }
> +
> + ~MpscLinkedQueue()
> + {
> + while (auto element = dequeue()) {
> + delete element;
> + }
> +
> + delete tail;
> + }
> +
> + // Multi producer safe.
> + void enqueue(T* element)
> + {
> + // A `nullptr` is used to denote an empty queue when doing a
> + // `dequeue()` so producers can't use it as an element.
> + CHECK_NOTNULL(element);
> +
> + auto newNode = new Node<T>(element);
> +
> + // Exchange is guaranteed to only give the old value to one
> + // producer, so this is safe and wait-free.
> + auto oldhead = head.exchange(newNode, std::memory_order_release);
> +
> + // At this point if this thread context switches out we may block
> + // the consumer from doing a dequeue (see below). Eventually we'll
> + // unblock the consumer once we run again and execute the next
> + // line of code.
> + oldhead->next.store(newNode, std::memory_order_release);
> + }
> +
> + // Single consumer only.
> + T* dequeue()
> + {
> + auto currentTail = tail;
> +
> + // Check and see if there is an actual element linked from `tail`
> + // since we use `tail` as a "stub" rather than the actual element.
> + auto nextTail = currentTail->next.exchange(
> + nullptr,
> + std::memory_order_relaxed);
> +
> + // There are three possible cases here:
> + //
> + // (1) The queue is empty.
> + // (2) The queue appears empty but a producer is still enqueuing
> + // so let's wait for it and then dequeue.
> + // (3) We have something to dequeue.
> + //
> + // Start by checking if the queue is or appears empty.
> + if (nextTail == nullptr) {
> + // Now check if the queue is actually empty or just appears
> + // empty. If it's actually empty then return `nullptr` to denote
> + // emptiness.
> + if (head.load(std::memory_order_relaxed) == tail) {
> + return nullptr;
> + }
> +
> + // Another thread already inserted a new node, but did not
> + // connect it to the tail, yet, so we spin-wait. At this point
> + // we are not wait-free anymore.
> + do {
> + nextTail = currentTail->next.exchange(
> + nullptr,
> + std::memory_order_relaxed);
> + } while (nextTail == nullptr);
> + }
> +
> + CHECK_NOTNULL(nextTail);
> +
> + auto element = nextTail->element;
> + nextTail->element = nullptr;
> +
> + tail = nextTail;
> + delete currentTail;
> +
> + return element;
> + }
> +
> + // Single consumer only.
> + //
> + // TODO(drexin): Provide C++ style iteration so someone can just use
> + // the `std::for_each()`.
> + template <typename F>
> + void for_each(F&& f)
> + {
> + auto end = head.load();
> + auto node = tail;
> +
> + for (;;) {
> + node = node->next.load();
> +
> + // We are following the linked structure until we reach the end
> + // node. There is a race with new nodes being added, so we limit
> + // the traversal to the last node at the time we started.
> + if (node == nullptr) {
> + return;
> + }
> +
> + f(node->element);
> +
> + if (node == end) {
> + return;
> + }
> + }
> + }
> +
> + // Single consumer only.
> + bool empty()
> + {
> + return tail->next.load(std::memory_order_relaxed) == nullptr &&
> + head.load(std::memory_order_relaxed) == tail;
> + }
> +
> +private:
> + std::atomic<Node<T>*> head;
> +
> + // TODO(drexin): Programatically get the cache line size.
> + alignas(128) Node<T>* tail;
> +};
> +
> +} // namespace process {
> +
> +#endif // __MPSC_LINKED_QUEUE_HPP__
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
> index 25a34f9..5814bc6 100644
> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
> limiter_tests.cpp
> loop_tests.cpp
> metrics_tests.cpp
> + mpsc_linked_queue_tests.cpp
> mutex_tests.cpp
> owned_tests.cpp
> process_tests.cpp
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
> index 2ec0d42..e8ef21f 100644
> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> @@ -22,6 +22,7 @@
> #include <iostream>
> #include <memory>
> #include <string>
> +#include <thread>
> #include <vector>
>
> #include <process/collect.hpp>
> @@ -40,6 +41,8 @@
>
> #include "benchmarks.pb.h"
>
> +#include "mpsc_linked_queue.hpp"
> +
> namespace http = process::http;
>
> using process::CountDownLatch;
> @@ -567,7 +570,6 @@ private:
> long count = 0;
> };
>
> -
> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
> {
> constexpr long repeats = 100000;
> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
> process.run(num_submessages);
> }
> }
> +
> +
> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
> +{
> + // NOTE: we set the total number of producers to be 1 less than the
> + // hardware concurrency so the consumer doesn't have to fight for
> + // processing time with the producers.
> + const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
> + const int messageCount = 10000000;
> + const int totalCount = messageCount * producerCount;
> + std::string* s = new std::string("");
> + process::MpscLinkedQueue<std::string> q;
> +
> + Stopwatch consumerWatch;
> +
> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
> + consumerWatch.start();
> + for (int i = totalCount; i > 0;) {
> + if (q.dequeue() != nullptr) {
> + i--;
> + }
> + }
> + consumerWatch.stop();
> + });
> +
> + std::vector<std::thread> producers;
> +
> + Stopwatch producerWatch;
> + producerWatch.start();
> +
> + for (unsigned int t = 0; t < producerCount; t++) {
> + producers.push_back(std::thread([messageCount, s, &q]() {
> + for (int i = 0; i < messageCount; i++) {
> + q.enqueue(s);
> + }
> + }));
> + }
> +
> + for (std::thread& producer : producers) {
> + producer.join();
> + }
> +
> + producerWatch.stop();
> +
> + consumer.join();
> +
> + Duration producerElapsed = producerWatch.elapsed();
> + Duration consumerElapsed = consumerWatch.elapsed();
> +
> + double consumerThroughput = (double) totalCount / consumerElapsed.secs();
> + double producerThroughput = (double) totalCount / producerElapsed.secs();
> + double throughput = consumerThroughput + producerThroughput;
> +
> + cout << "Estimated producer throughput (" << producerCount << " threads): "
> + << std::fixed << producerThroughput << " op/s" << endl;
> + cout << "Estimated consumer throughput: "
> + << std::fixed << consumerThroughput << " op/s" << endl;
> + cout << "Estimated total throughput: "
> + << std::fixed << throughput << " op/s" << endl;
> +}
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> new file mode 100644
> index 0000000..7699974
> --- /dev/null
> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> @@ -0,0 +1,104 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +// http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#include <thread>
> +
> +#include <stout/gtest.hpp>
> +#include <stout/stringify.hpp>
> +
> +#include "mpsc_linked_queue.hpp"
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + std::string* s = new std::string("test");
> + q.enqueue(s);
> + std::string* s2 = q.dequeue();
> + ASSERT_EQ(s, s2);
> + delete s2;
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + for (int i = 0; i < 20; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> +
> + for (int i = 0; i < 20; i++) {
> + std::string* s = q.dequeue();
> + ASSERT_EQ(*s, stringify(i));
> + delete s;
> + }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + std::vector<std::thread> threads;
> + for (int t = 0; t < 5; t++) {
> + threads.push_back(
> + std::thread([t, &q]() {
> + int start = t * 1000;
> + int end = start + 1000;
> + for (int i = start; i < end; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> + }));
> + }
> +
> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
> + t.join();
> + });
> +
> + std::set<std::string> elements;
> +
> + std::string* s = nullptr;
> + while ((s = q.dequeue()) != nullptr) {
> + elements.insert(*s);
> + }
> +
> + ASSERT_EQ(5000UL, elements.size());
> +
> + for (int i = 0; i < 5000; i++) {
> + ASSERT_NE(elements.end(), elements.find(stringify(i)));
> + }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, ForEach)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + for (int i = 0; i < 20; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> + int i = 0;
> + q.for_each([&](std::string* s) {
> + ASSERT_EQ(*s, stringify(i++));
> + });
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, Empty)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + ASSERT_TRUE(q.empty());
> + std::string* s = new std::string("test");
> + q.enqueue(s);
> + ASSERT_FALSE(q.empty());
> + q.dequeue();
> + ASSERT_TRUE(q.empty());
> + delete s;
> +}
>