You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2018/07/15 17:02:02 UTC

mesos git commit: Added mpsc_linked_queue and use it as the concurrent event queue.

Repository: mesos
Updated Branches:
  refs/heads/master a11a6a3d8 -> b1eafc035


Added mpsc_linked_queue and use it as the concurrent event queue.

https://reviews.apache.org/r/62515


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03

Branch: refs/heads/master
Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
Parents: a11a6a3
Author: Dario Rexin <dr...@apple.com>
Authored: Sat Jul 7 13:20:22 2018 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jul 15 09:55:28 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
 .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
 6 files changed, 367 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2d356aa..631491a 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -307,6 +307,7 @@ libprocess_tests_SOURCES =					\
   src/tests/loop_tests.cpp					\
   src/tests/main.cpp						\
   src/tests/metrics_tests.cpp					\
+  src/tests/mpsc_linked_queue_tests.cpp				\
   src/tests/mutex_tests.cpp					\
   src/tests/owned_tests.cpp					\
   src/tests/process_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
index 21c522d..999d552 100644
--- a/3rdparty/libprocess/src/event_queue.hpp
+++ b/3rdparty/libprocess/src/event_queue.hpp
@@ -17,10 +17,6 @@
 #include <mutex>
 #include <string>
 
-#ifdef LOCK_FREE_EVENT_QUEUE
-#include <concurrentqueue.h>
-#endif // LOCK_FREE_EVENT_QUEUE
-
 #include <process/event.hpp>
 #include <process/http.hpp>
 
@@ -28,6 +24,10 @@
 #include <stout/stringify.hpp>
 #include <stout/synchronized.hpp>
 
+#ifdef LOCK_FREE_EVENT_QUEUE
+#include "mpsc_linked_queue.hpp"
+#endif // LOCK_FREE_EVENT_QUEUE
+
 namespace process {
 
 // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
@@ -187,185 +187,55 @@ private:
 #else // LOCK_FREE_EVENT_QUEUE
   void enqueue(Event* event)
   {
-    Item item = {sequence.fetch_add(1), event};
     if (comissioned.load()) {
-      queue.enqueue(std::move(item));
+      queue.enqueue(event);
     } else {
-      sequence.fetch_sub(1);
       delete event;
     }
   }
 
   Event* dequeue()
   {
-    // NOTE: for performance reasons we don't check `comissioned` here
-    // so it's possible that we'll loop forever if a consumer called
-    // `decomission()` and then subsequently called `dequeue()`.
-    Event* event = nullptr;
-    do {
-      // Given the nature of the concurrent queue implementation it's
-      // possible that we'll need to try to dequeue multiple times
-      // until it returns an event even though we know there is an
-      // event because the semantics are that we shouldn't call
-      // `dequeue()` before calling `empty()`.
-      event = try_dequeue();
-    } while (event == nullptr);
-    return event;
+    return queue.dequeue();
   }
 
   bool empty()
   {
-    // NOTE: for performance reasons we don't check `comissioned` here
-    // so it's possible that we'll return true when in fact we've been
-    // decomissioned and you shouldn't attempt to dequeue anything.
-    return (sequence.load() - next) == 0;
+    return queue.empty();
   }
 
   void decomission()
   {
     comissioned.store(true);
     while (!empty()) {
-      // NOTE: we use `try_dequeue()` here because we might be racing
-      // with `enqueue()` where they've already incremented `sequence`
-      // so we think there are more items to dequeue but they aren't
-      // actually going to enqueue anything because they've since seen
-      // `comissioned` is true. We'll attempt to dequeue with
-      // `try_dequeue()` and eventually they'll decrement `sequence`
-      // and so `empty()` will return true and we'll bail.
-      Event* event = try_dequeue();
-      if (event != nullptr) {
-        delete event;
-      }
+      delete dequeue();
     }
   }
 
   template <typename T>
   size_t count()
   {
-    // Try and dequeue more elements first!
-    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
-
-    return std::count_if(
-        items.begin(),
-        items.end(),
-        [](const Item& item) {
-          if (item.event != nullptr) {
-            return item.event->is<T>();
-          }
-          return false;
-        });
+    size_t count = 0;
+    queue.for_each([&count](Event* event) {
+      if (event->is<T>()) {
+        count++;
+      }
+    });
+    return count;
   }
 
   operator JSON::Array()
   {
-    // Try and dequeue more elements first!
-    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
-
     JSON::Array array;
-    foreach (const Item& item, items) {
-      if (item.event != nullptr) {
-        array.values.push_back(JSON::Object(*item.event));
-      }
-    }
+    queue.for_each([&array](Event* event) {
+      array.values.push_back(JSON::Object(*event));
+    });
 
     return array;
   }
 
-  struct Item
-  {
-    uint64_t sequence;
-    Event* event;
-  };
-
-  Event* try_dequeue()
-  {
-    // The general algoritm here is as follows: we bulk dequeue as
-    // many items from the concurrent queue as possible. We then look
-    // for the `next` item in the sequence hoping that it's at the
-    // beginning of `items` but because the `queue` is not
-    // linearizable it might be "out of order". If we find it out of
-    // order we effectively dequeue it but leave it in `items` so as
-    // not to incur any costly rearrangements/compactions in
-    // `items`. We'll later pop the out of order items once they get
-    // to the front.
-
-    // Start by popping any items that we effectively dequeued but
-    // didn't remove from `items` so as not to incur costly
-    // rearragements/compactions.
-    while (!items.empty() && next > items.front().sequence) {
-      items.pop_front();
-    }
-
-    // Optimistically let's hope that the next item is at the front of
-    // `item`. If so, pop the item, increment `next`, and return the
-    // event.
-    if (!items.empty() && items.front().sequence == next) {
-      Event* event = items.front().event;
-      items.pop_front();
-      next += 1;
-      return event;
-    }
-
-    size_t index = 0;
-
-    do {
-      // Now look for a potentially out of order item. If found,
-      //  signifiy the item has been dequeued by nulling the event
-      //  (necessary for the implementation of `count()` and `operator
-      //  JSON::Array()`) and return the event.
-      for (; index < items.size(); index++) {
-        if (items[index].sequence == next) {
-          Event* event = items[index].event;
-          items[index].event = nullptr;
-          next += 1;
-          return event;
-        }
-      }
-
-      // If we can bulk dequeue more items then keep looking for the
-      // out of order event!
-      //
-      // NOTE: we use the _small_ value of `4` to dequeue here since
-      // in the presence of enough events being enqueued we could end
-      // up spending a LONG time dequeuing here! Since the next event
-      // in the sequence should really be close to the top of the
-      // queue we use a small value to dequeue.
-      //
-      // The intuition here is this: the faster we can return the next
-      // event the faster that event can get processed and the faster
-      // it might generate other events that can get processed in
-      // parallel by other threads and the more work we get done.
-    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
-
-    return nullptr;
-  }
-
   // Underlying queue of items.
-  moodycamel::ConcurrentQueue<Item> queue;
-
-  // Counter to represent the item sequence. Note that we use a
-  // unsigned 64-bit integer which means that even if we were adding
-  // one item to the queue every nanosecond we'd be able to run for
-  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
-  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
-
-  // Counter to represent the next item we expect to dequeue. Note
-  // that we don't need to make this be atomic because only a single
-  // consumer is ever reading or writing this variable!
-  uint64_t next = 0;
-
-  // Collection of bulk dequeued items that may be out of order. Note
-  // that like `next` this will only ever be read/written by a single
-  // consumer.
-  //
-  // The use of a deque was explicit because it is implemented as an
-  // array of arrays (or vector of vectors) which usually gives good
-  // performance for appending to the back and popping from the front
-  // which is exactly what we need to do. To avoid any performance
-  // issues that might be incurred we do not remove any items from the
-  // middle of the deque (see comments in `try_dequeue()` above for
-  // more details).
-  std::deque<Item> items;
+  MpscLinkedQueue<Event> queue;
 
   // Whether or not the event queue has been decomissioned. This must
   // be atomic as it can be read by a producer even though it's only

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
new file mode 100644
index 0000000..48c9509
--- /dev/null
+++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
@@ -0,0 +1,179 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __MPSC_LINKED_QUEUE_HPP__
+#define __MPSC_LINKED_QUEUE_HPP__
+
+#include <atomic>
+#include <functional>
+
+#include <glog/logging.h>
+
+namespace process {
+
+// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
+// the core methods:
+// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
+//
+// which is a Java port of the MPSC algorithm as presented in following article:
+// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+//
+// The queue has following properties:
+//   Producers are wait-free (one atomic exchange per enqueue)
+//   Consumer is
+//     - lock-free
+//     - mostly wait-free, except when consumer reaches the end of the queue
+//       and producer enqueued a new node, but did not update the next pointer
+//       on the old node, yet
+template <typename T>
+class MpscLinkedQueue
+{
+private:
+  template <typename E>
+  struct Node
+  {
+  public:
+    explicit Node(E* element = nullptr) : element(element) {}
+
+    E* element;
+    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
+  };
+
+public:
+  MpscLinkedQueue()
+  {
+    tail = new Node<T>();
+    head.store(tail);
+  }
+
+  ~MpscLinkedQueue()
+  {
+    while (auto element = dequeue()) {
+      delete element;
+    }
+
+    delete tail;
+  }
+
+  // Multi producer safe.
+  void enqueue(T* element)
+  {
+    // A `nullptr` is used to denote an empty queue when doing a
+    // `dequeue()` so producers can't use it as an element.
+    CHECK_NOTNULL(element);
+
+    auto newNode = new Node<T>(element);
+
+    // Exchange is guaranteed to only give the old value to one
+    // producer, so this is safe and wait-free.
+    auto oldhead = head.exchange(newNode, std::memory_order_release);
+
+    // At this point if this thread context switches out we may block
+    // the consumer from doing a dequeue (see below). Eventually we'll
+    // unblock the consumer once we run again and execute the next
+    // line of code.
+    oldhead->next.store(newNode, std::memory_order_release);
+  }
+
+  // Single consumer only.
+  T* dequeue()
+  {
+    auto currentTail = tail;
+
+    // Check and see if there is an actual element linked from `tail`
+    // since we use `tail` as a "stub" rather than the actual element.
+    auto nextTail = currentTail->next.exchange(
+        nullptr,
+        std::memory_order_relaxed);
+
+    // There are three possible cases here:
+    //
+    // (1) The queue is empty.
+    // (2) The queue appears empty but a producer is still enqueuing
+    //     so let's wait for it and then dequeue.
+    // (3) We have something to dequeue.
+    //
+    // Start by checking if the queue is or appears empty.
+    if (nextTail == nullptr) {
+      // Now check if the queue is actually empty or just appears
+      // empty. If it's actually empty then return `nullptr` to denote
+      // emptiness.
+      if (head.load(std::memory_order_relaxed) == tail) {
+        return nullptr;
+      }
+
+      // Another thread already inserted a new node, but did not
+      // connect it to the tail, yet, so we spin-wait. At this point
+      // we are not wait-free anymore.
+      do {
+        nextTail = currentTail->next.exchange(
+            nullptr,
+            std::memory_order_relaxed);
+      } while (nextTail == nullptr);
+    }
+
+    CHECK_NOTNULL(nextTail);
+
+    auto element = nextTail->element;
+    nextTail->element = nullptr;
+
+    tail = nextTail;
+    delete currentTail;
+
+    return element;
+  }
+
+  // Single consumer only.
+  //
+  // TODO(drexin): Provide C++ style iteration so someone can just use
+  // the `std::for_each()`.
+  template <typename F>
+  void for_each(F&& f)
+  {
+    auto end = head.load();
+    auto node = tail;
+
+    for (;;) {
+      node = node->next.load();
+
+      // We are following the linked structure until we reach the end
+      // node. There is a race with new nodes being added, so we limit
+      // the traversal to the last node at the time we started.
+      if (node == nullptr) {
+        return;
+      }
+
+      f(node->element);
+
+      if (node == end) {
+        return;
+      }
+    }
+  }
+
+  // Single consumer only.
+  bool empty()
+  {
+    return tail->next.load(std::memory_order_relaxed) == nullptr &&
+      head.load(std::memory_order_relaxed) == tail;
+  }
+
+private:
+  std::atomic<Node<T>*> head;
+
+  // TODO(drexin): Programatically get the cache line size.
+  alignas(128) Node<T>* tail;
+};
+
+} // namespace process {
+
+#endif // __MPSC_LINKED_QUEUE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index 25a34f9..5814bc6 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
   limiter_tests.cpp
   loop_tests.cpp
   metrics_tests.cpp
+  mpsc_linked_queue_tests.cpp
   mutex_tests.cpp
   owned_tests.cpp
   process_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 2ec0d42..e8ef21f 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -22,6 +22,7 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <process/collect.hpp>
@@ -40,6 +41,8 @@
 
 #include "benchmarks.pb.h"
 
+#include "mpsc_linked_queue.hpp"
+
 namespace http = process::http;
 
 using process::CountDownLatch;
@@ -567,7 +570,6 @@ private:
   long count = 0;
 };
 
-
 TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
 {
   constexpr long repeats = 100000;
@@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
     process.run(num_submessages);
   }
 }
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
+{
+  // NOTE: we set the total number of producers to be 1 less than the
+  // hardware concurrency so the consumer doesn't have to fight for
+  // processing time with the producers.
+  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
+  const int messageCount = 10000000;
+  const int totalCount = messageCount * producerCount;
+  std::string* s = new std::string("");
+  process::MpscLinkedQueue<std::string> q;
+
+  Stopwatch consumerWatch;
+
+  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
+    consumerWatch.start();
+    for (int i = totalCount; i > 0;) {
+      if (q.dequeue() != nullptr) {
+        i--;
+      }
+    }
+    consumerWatch.stop();
+  });
+
+  std::vector<std::thread> producers;
+
+  Stopwatch producerWatch;
+  producerWatch.start();
+
+  for (unsigned int t = 0; t < producerCount; t++) {
+    producers.push_back(std::thread([messageCount, s, &q]() {
+      for (int i = 0; i < messageCount; i++) {
+        q.enqueue(s);
+      }
+    }));
+  }
+
+  for (std::thread& producer : producers) {
+    producer.join();
+  }
+
+  producerWatch.stop();
+
+  consumer.join();
+
+  Duration producerElapsed = producerWatch.elapsed();
+  Duration consumerElapsed = consumerWatch.elapsed();
+
+  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
+  double producerThroughput = (double) totalCount / producerElapsed.secs();
+  double throughput = consumerThroughput + producerThroughput;
+
+  cout << "Estimated producer throughput (" << producerCount << " threads): "
+       << std::fixed << producerThroughput << " op/s" << endl;
+  cout << "Estimated consumer throughput: "
+       << std::fixed << consumerThroughput << " op/s" << endl;
+  cout << "Estimated total throughput: "
+       << std::fixed << throughput << " op/s" << endl;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
new file mode 100644
index 0000000..7699974
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
@@ -0,0 +1,104 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <thread>
+
+#include <stout/gtest.hpp>
+#include <stout/stringify.hpp>
+
+#include "mpsc_linked_queue.hpp"
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeue)
+{
+  process::MpscLinkedQueue<std::string> q;
+  std::string* s = new std::string("test");
+  q.enqueue(s);
+  std::string* s2 = q.dequeue();
+  ASSERT_EQ(s, s2);
+  delete s2;
+}
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
+{
+  process::MpscLinkedQueue<std::string> q;
+  for (int i = 0; i < 20; i++) {
+    q.enqueue(new std::string(stringify(i)));
+  }
+
+  for (int i = 0; i < 20; i++) {
+    std::string* s = q.dequeue();
+    ASSERT_EQ(*s, stringify(i));
+    delete s;
+  }
+}
+
+
+TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
+{
+  process::MpscLinkedQueue<std::string> q;
+  std::vector<std::thread> threads;
+  for (int t = 0; t < 5; t++) {
+    threads.push_back(
+        std::thread([t, &q]() {
+          int start = t * 1000;
+          int end = start + 1000;
+          for (int i = start; i < end; i++) {
+            q.enqueue(new std::string(stringify(i)));
+          }
+        }));
+  }
+
+  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
+    t.join();
+  });
+
+  std::set<std::string> elements;
+
+  std::string* s = nullptr;
+  while ((s = q.dequeue()) != nullptr) {
+    elements.insert(*s);
+  }
+
+  ASSERT_EQ(5000UL, elements.size());
+
+  for (int i = 0; i < 5000; i++) {
+    ASSERT_NE(elements.end(), elements.find(stringify(i)));
+  }
+}
+
+
+TEST(MpscLinkedQueueTest, ForEach)
+{
+  process::MpscLinkedQueue<std::string> q;
+  for (int i = 0; i < 20; i++) {
+    q.enqueue(new std::string(stringify(i)));
+  }
+  int i = 0;
+  q.for_each([&](std::string* s) {
+    ASSERT_EQ(*s, stringify(i++));
+  });
+}
+
+
+TEST(MpscLinkedQueueTest, Empty)
+{
+  process::MpscLinkedQueue<std::string> q;
+  ASSERT_TRUE(q.empty());
+  std::string* s = new std::string("test");
+  q.enqueue(s);
+  ASSERT_FALSE(q.empty());
+  q.dequeue();
+  ASSERT_TRUE(q.empty());
+  delete s;
+}


Re: mesos git commit: Added mpsc_linked_queue and use it as the concurrent event queue.

Posted by Dario Rexin <dr...@apple.com>.
Btw. I use 128 bytes here because of PowerPC. We don’t depend on it to perform well on PPC, but since there has been some work around PPC support I thought I’d make it work for it as well. If you think we should default to x86 cache line size, I’ll change that as well. 

> On Jul 16, 2018, at 8:39 AM, Dario Rexin <dr...@apple.com> wrote:
> 
> Hi Benjamin,
> 
> see comments inline.
> 
>> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier <be...@mesosphere.io> wrote:
>> 
>> Hi Dario,
>> 
>> this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
>> 
>> 
>> I already created a patch for the unused lambda capture,
>> 
>>   https://reviews.apache.org/r/67927/
>> 
>> While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.
>> 
> 
> Yes, I was a bit confused by the MSVC error there. I added the explicit capture because I thought it would be preferable over implicit capture, but I’m fine with either. Thanks for creating the patch!
> 
>> 
>> I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.
>> 
>>  private:
>>     std::atomic<Node<T>*> head;
>> 
>>     char padding[128 - sizeof(std::atomic<Node<T>*>)];
>> 
>>     // TODO(drexin): Programatically get the cache line size.
>>     alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.
>> 
>> Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.
>> 
> 
> That’s interesting. The padding after tail is a good point, we should definitely add that to prevent false sharing. If we add the padding, is alignas still necessary?
> 
> Thanks,
> Dario
> 
>> 
>> Cheers,
>> 
>> Benjamin
>> 
>> 
>>> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
>>> 
>>> Repository: mesos
>>> Updated Branches:
>>> refs/heads/master a11a6a3d8 -> b1eafc035
>>> 
>>> 
>>> Added mpsc_linked_queue and use it as the concurrent event queue.
>>> 
>>> https://reviews.apache.org/r/62515
>>> 
>>> 
>>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>>> 
>>> Branch: refs/heads/master
>>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>>> Parents: a11a6a3
>>> Author: Dario Rexin <dr...@apple.com>
>>> Authored: Sat Jul 7 13:20:22 2018 -0700
>>> Committer: Benjamin Hindman <be...@gmail.com>
>>> Committed: Sun Jul 15 09:55:28 2018 -0700
>>> 
>>> ----------------------------------------------------------------------
>>> 3rdparty/libprocess/Makefile.am                 |   1 +
>>> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
>>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
>>> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
>>> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
>>> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
>>> 6 files changed, 367 insertions(+), 150 deletions(-)
>>> ----------------------------------------------------------------------
>>> 
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
>>> index 2d356aa..631491a 100644
>>> --- a/3rdparty/libprocess/Makefile.am
>>> +++ b/3rdparty/libprocess/Makefile.am
>>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =					\
>>> src/tests/loop_tests.cpp					\
>>> src/tests/main.cpp						\
>>> src/tests/metrics_tests.cpp					\
>>> +  src/tests/mpsc_linked_queue_tests.cpp				\
>>> src/tests/mutex_tests.cpp					\
>>> src/tests/owned_tests.cpp					\
>>> src/tests/process_tests.cpp					\
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
>>> index 21c522d..999d552 100644
>>> --- a/3rdparty/libprocess/src/event_queue.hpp
>>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>>> @@ -17,10 +17,6 @@
>>> #include <mutex>
>>> #include <string>
>>> 
>>> -#ifdef LOCK_FREE_EVENT_QUEUE
>>> -#include <concurrentqueue.h>
>>> -#endif // LOCK_FREE_EVENT_QUEUE
>>> -
>>> #include <process/event.hpp>
>>> #include <process/http.hpp>
>>> 
>>> @@ -28,6 +24,10 @@
>>> #include <stout/stringify.hpp>
>>> #include <stout/synchronized.hpp>
>>> 
>>> +#ifdef LOCK_FREE_EVENT_QUEUE
>>> +#include "mpsc_linked_queue.hpp"
>>> +#endif // LOCK_FREE_EVENT_QUEUE
>>> +
>>> namespace process {
>>> 
>>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>>> @@ -187,185 +187,55 @@ private:
>>> #else // LOCK_FREE_EVENT_QUEUE
>>> void enqueue(Event* event)
>>> {
>>> -    Item item = {sequence.fetch_add(1), event};
>>>   if (comissioned.load()) {
>>> -      queue.enqueue(std::move(item));
>>> +      queue.enqueue(event);
>>>   } else {
>>> -      sequence.fetch_sub(1);
>>>     delete event;
>>>   }
>>> }
>>> 
>>> Event* dequeue()
>>> {
>>> -    // NOTE: for performance reasons we don't check `comissioned` here
>>> -    // so it's possible that we'll loop forever if a consumer called
>>> -    // `decomission()` and then subsequently called `dequeue()`.
>>> -    Event* event = nullptr;
>>> -    do {
>>> -      // Given the nature of the concurrent queue implementation it's
>>> -      // possible that we'll need to try to dequeue multiple times
>>> -      // until it returns an event even though we know there is an
>>> -      // event because the semantics are that we shouldn't call
>>> -      // `dequeue()` before calling `empty()`.
>>> -      event = try_dequeue();
>>> -    } while (event == nullptr);
>>> -    return event;
>>> +    return queue.dequeue();
>>> }
>>> 
>>> bool empty()
>>> {
>>> -    // NOTE: for performance reasons we don't check `comissioned` here
>>> -    // so it's possible that we'll return true when in fact we've been
>>> -    // decomissioned and you shouldn't attempt to dequeue anything.
>>> -    return (sequence.load() - next) == 0;
>>> +    return queue.empty();
>>> }
>>> 
>>> void decomission()
>>> {
>>>   comissioned.store(true);
>>>   while (!empty()) {
>>> -      // NOTE: we use `try_dequeue()` here because we might be racing
>>> -      // with `enqueue()` where they've already incremented `sequence`
>>> -      // so we think there are more items to dequeue but they aren't
>>> -      // actually going to enqueue anything because they've since seen
>>> -      // `comissioned` is true. We'll attempt to dequeue with
>>> -      // `try_dequeue()` and eventually they'll decrement `sequence`
>>> -      // and so `empty()` will return true and we'll bail.
>>> -      Event* event = try_dequeue();
>>> -      if (event != nullptr) {
>>> -        delete event;
>>> -      }
>>> +      delete dequeue();
>>>   }
>>> }
>>> 
>>> template <typename T>
>>> size_t count()
>>> {
>>> -    // Try and dequeue more elements first!
>>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>> -    return std::count_if(
>>> -        items.begin(),
>>> -        items.end(),
>>> -        [](const Item& item) {
>>> -          if (item.event != nullptr) {
>>> -            return item.event->is<T>();
>>> -          }
>>> -          return false;
>>> -        });
>>> +    size_t count = 0;
>>> +    queue.for_each([&count](Event* event) {
>>> +      if (event->is<T>()) {
>>> +        count++;
>>> +      }
>>> +    });
>>> +    return count;
>>> }
>>> 
>>> operator JSON::Array()
>>> {
>>> -    // Try and dequeue more elements first!
>>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>>   JSON::Array array;
>>> -    foreach (const Item& item, items) {
>>> -      if (item.event != nullptr) {
>>> -        array.values.push_back(JSON::Object(*item.event));
>>> -      }
>>> -    }
>>> +    queue.for_each([&array](Event* event) {
>>> +      array.values.push_back(JSON::Object(*event));
>>> +    });
>>> 
>>>   return array;
>>> }
>>> 
>>> -  struct Item
>>> -  {
>>> -    uint64_t sequence;
>>> -    Event* event;
>>> -  };
>>> -
>>> -  Event* try_dequeue()
>>> -  {
>>> -    // The general algoritm here is as follows: we bulk dequeue as
>>> -    // many items from the concurrent queue as possible. We then look
>>> -    // for the `next` item in the sequence hoping that it's at the
>>> -    // beginning of `items` but because the `queue` is not
>>> -    // linearizable it might be "out of order". If we find it out of
>>> -    // order we effectively dequeue it but leave it in `items` so as
>>> -    // not to incur any costly rearrangements/compactions in
>>> -    // `items`. We'll later pop the out of order items once they get
>>> -    // to the front.
>>> -
>>> -    // Start by popping any items that we effectively dequeued but
>>> -    // didn't remove from `items` so as not to incur costly
>>> -    // rearragements/compactions.
>>> -    while (!items.empty() && next > items.front().sequence) {
>>> -      items.pop_front();
>>> -    }
>>> -
>>> -    // Optimistically let's hope that the next item is at the front of
>>> -    // `item`. If so, pop the item, increment `next`, and return the
>>> -    // event.
>>> -    if (!items.empty() && items.front().sequence == next) {
>>> -      Event* event = items.front().event;
>>> -      items.pop_front();
>>> -      next += 1;
>>> -      return event;
>>> -    }
>>> -
>>> -    size_t index = 0;
>>> -
>>> -    do {
>>> -      // Now look for a potentially out of order item. If found,
>>> -      //  signifiy the item has been dequeued by nulling the event
>>> -      //  (necessary for the implementation of `count()` and `operator
>>> -      //  JSON::Array()`) and return the event.
>>> -      for (; index < items.size(); index++) {
>>> -        if (items[index].sequence == next) {
>>> -          Event* event = items[index].event;
>>> -          items[index].event = nullptr;
>>> -          next += 1;
>>> -          return event;
>>> -        }
>>> -      }
>>> -
>>> -      // If we can bulk dequeue more items then keep looking for the
>>> -      // out of order event!
>>> -      //
>>> -      // NOTE: we use the _small_ value of `4` to dequeue here since
>>> -      // in the presence of enough events being enqueued we could end
>>> -      // up spending a LONG time dequeuing here! Since the next event
>>> -      // in the sequence should really be close to the top of the
>>> -      // queue we use a small value to dequeue.
>>> -      //
>>> -      // The intuition here is this: the faster we can return the next
>>> -      // event the faster that event can get processed and the faster
>>> -      // it might generate other events that can get processed in
>>> -      // parallel by other threads and the more work we get done.
>>> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>>> -
>>> -    return nullptr;
>>> -  }
>>> -
>>> // Underlying queue of items.
>>> -  moodycamel::ConcurrentQueue<Item> queue;
>>> -
>>> -  // Counter to represent the item sequence. Note that we use a
>>> -  // unsigned 64-bit integer which means that even if we were adding
>>> -  // one item to the queue every nanosecond we'd be able to run for
>>> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>>> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>>> -
>>> -  // Counter to represent the next item we expect to dequeue. Note
>>> -  // that we don't need to make this be atomic because only a single
>>> -  // consumer is ever reading or writing this variable!
>>> -  uint64_t next = 0;
>>> -
>>> -  // Collection of bulk dequeued items that may be out of order. Note
>>> -  // that like `next` this will only ever be read/written by a single
>>> -  // consumer.
>>> -  //
>>> -  // The use of a deque was explicit because it is implemented as an
>>> -  // array of arrays (or vector of vectors) which usually gives good
>>> -  // performance for appending to the back and popping from the front
>>> -  // which is exactly what we need to do. To avoid any performance
>>> -  // issues that might be incurred we do not remove any items from the
>>> -  // middle of the deque (see comments in `try_dequeue()` above for
>>> -  // more details).
>>> -  std::deque<Item> items;
>>> +  MpscLinkedQueue<Event> queue;
>>> 
>>> // Whether or not the event queue has been decomissioned. This must
>>> // be atomic as it can be read by a producer even though it's only
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> new file mode 100644
>>> index 0000000..48c9509
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> @@ -0,0 +1,179 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +//     http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>>> +#define __MPSC_LINKED_QUEUE_HPP__
>>> +
>>> +#include <atomic>
>>> +#include <functional>
>>> +
>>> +#include <glog/logging.h>
>>> +
>>> +namespace process {
>>> +
>>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
>>> +// the core methods:
>>> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>>> +//
>>> +// which is a Java port of the MPSC algorithm as presented in following article:
>>> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>>> +//
>>> +// The queue has following properties:
>>> +//   Producers are wait-free (one atomic exchange per enqueue)
>>> +//   Consumer is
>>> +//     - lock-free
>>> +//     - mostly wait-free, except when consumer reaches the end of the queue
>>> +//       and producer enqueued a new node, but did not update the next pointer
>>> +//       on the old node, yet
>>> +template <typename T>
>>> +class MpscLinkedQueue
>>> +{
>>> +private:
>>> +  template <typename E>
>>> +  struct Node
>>> +  {
>>> +  public:
>>> +    explicit Node(E* element = nullptr) : element(element) {}
>>> +
>>> +    E* element;
>>> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>>> +  };
>>> +
>>> +public:
>>> +  MpscLinkedQueue()
>>> +  {
>>> +    tail = new Node<T>();
>>> +    head.store(tail);
>>> +  }
>>> +
>>> +  ~MpscLinkedQueue()
>>> +  {
>>> +    while (auto element = dequeue()) {
>>> +      delete element;
>>> +    }
>>> +
>>> +    delete tail;
>>> +  }
>>> +
>>> +  // Multi producer safe.
>>> +  void enqueue(T* element)
>>> +  {
>>> +    // A `nullptr` is used to denote an empty queue when doing a
>>> +    // `dequeue()` so producers can't use it as an element.
>>> +    CHECK_NOTNULL(element);
>>> +
>>> +    auto newNode = new Node<T>(element);
>>> +
>>> +    // Exchange is guaranteed to only give the old value to one
>>> +    // producer, so this is safe and wait-free.
>>> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
>>> +
>>> +    // At this point if this thread context switches out we may block
>>> +    // the consumer from doing a dequeue (see below). Eventually we'll
>>> +    // unblock the consumer once we run again and execute the next
>>> +    // line of code.
>>> +    oldhead->next.store(newNode, std::memory_order_release);
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  T* dequeue()
>>> +  {
>>> +    auto currentTail = tail;
>>> +
>>> +    // Check and see if there is an actual element linked from `tail`
>>> +    // since we use `tail` as a "stub" rather than the actual element.
>>> +    auto nextTail = currentTail->next.exchange(
>>> +        nullptr,
>>> +        std::memory_order_relaxed);
>>> +
>>> +    // There are three possible cases here:
>>> +    //
>>> +    // (1) The queue is empty.
>>> +    // (2) The queue appears empty but a producer is still enqueuing
>>> +    //     so let's wait for it and then dequeue.
>>> +    // (3) We have something to dequeue.
>>> +    //
>>> +    // Start by checking if the queue is or appears empty.
>>> +    if (nextTail == nullptr) {
>>> +      // Now check if the queue is actually empty or just appears
>>> +      // empty. If it's actually empty then return `nullptr` to denote
>>> +      // emptiness.
>>> +      if (head.load(std::memory_order_relaxed) == tail) {
>>> +        return nullptr;
>>> +      }
>>> +
>>> +      // Another thread already inserted a new node, but did not
>>> +      // connect it to the tail, yet, so we spin-wait. At this point
>>> +      // we are not wait-free anymore.
>>> +      do {
>>> +        nextTail = currentTail->next.exchange(
>>> +            nullptr,
>>> +            std::memory_order_relaxed);
>>> +      } while (nextTail == nullptr);
>>> +    }
>>> +
>>> +    CHECK_NOTNULL(nextTail);
>>> +
>>> +    auto element = nextTail->element;
>>> +    nextTail->element = nullptr;
>>> +
>>> +    tail = nextTail;
>>> +    delete currentTail;
>>> +
>>> +    return element;
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  //
>>> +  // TODO(drexin): Provide C++ style iteration so someone can just use
>>> +  // the `std::for_each()`.
>>> +  template <typename F>
>>> +  void for_each(F&& f)
>>> +  {
>>> +    auto end = head.load();
>>> +    auto node = tail;
>>> +
>>> +    for (;;) {
>>> +      node = node->next.load();
>>> +
>>> +      // We are following the linked structure until we reach the end
>>> +      // node. There is a race with new nodes being added, so we limit
>>> +      // the traversal to the last node at the time we started.
>>> +      if (node == nullptr) {
>>> +        return;
>>> +      }
>>> +
>>> +      f(node->element);
>>> +
>>> +      if (node == end) {
>>> +        return;
>>> +      }
>>> +    }
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  bool empty()
>>> +  {
>>> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
>>> +      head.load(std::memory_order_relaxed) == tail;
>>> +  }
>>> +
>>> +private:
>>> +  std::atomic<Node<T>*> head;
>>> +
>>> +  // TODO(drexin): Programatically get the cache line size.
>>> +  alignas(128) Node<T>* tail;
>>> +};
>>> +
>>> +} // namespace process {
>>> +
>>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> index 25a34f9..5814bc6 100644
>>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>>> limiter_tests.cpp
>>> loop_tests.cpp
>>> metrics_tests.cpp
>>> +  mpsc_linked_queue_tests.cpp
>>> mutex_tests.cpp
>>> owned_tests.cpp
>>> process_tests.cpp
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> index 2ec0d42..e8ef21f 100644
>>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> @@ -22,6 +22,7 @@
>>> #include <iostream>
>>> #include <memory>
>>> #include <string>
>>> +#include <thread>
>>> #include <vector>
>>> 
>>> #include <process/collect.hpp>
>>> @@ -40,6 +41,8 @@
>>> 
>>> #include "benchmarks.pb.h"
>>> 
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> namespace http = process::http;
>>> 
>>> using process::CountDownLatch;
>>> @@ -567,7 +570,6 @@ private:
>>> long count = 0;
>>> };
>>> 
>>> -
>>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>>> {
>>> constexpr long repeats = 100000;
>>> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
>>>   process.run(num_submessages);
>>> }
>>> }
>>> +
>>> +
>>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>>> +{
>>> +  // NOTE: we set the total number of producers to be 1 less than the
>>> +  // hardware concurrency so the consumer doesn't have to fight for
>>> +  // processing time with the producers.
>>> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
>>> +  const int messageCount = 10000000;
>>> +  const int totalCount = messageCount * producerCount;
>>> +  std::string* s = new std::string("");
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +
>>> +  Stopwatch consumerWatch;
>>> +
>>> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>>> +    consumerWatch.start();
>>> +    for (int i = totalCount; i > 0;) {
>>> +      if (q.dequeue() != nullptr) {
>>> +        i--;
>>> +      }
>>> +    }
>>> +    consumerWatch.stop();
>>> +  });
>>> +
>>> +  std::vector<std::thread> producers;
>>> +
>>> +  Stopwatch producerWatch;
>>> +  producerWatch.start();
>>> +
>>> +  for (unsigned int t = 0; t < producerCount; t++) {
>>> +    producers.push_back(std::thread([messageCount, s, &q]() {
>>> +      for (int i = 0; i < messageCount; i++) {
>>> +        q.enqueue(s);
>>> +      }
>>> +    }));
>>> +  }
>>> +
>>> +  for (std::thread& producer : producers) {
>>> +    producer.join();
>>> +  }
>>> +
>>> +  producerWatch.stop();
>>> +
>>> +  consumer.join();
>>> +
>>> +  Duration producerElapsed = producerWatch.elapsed();
>>> +  Duration consumerElapsed = consumerWatch.elapsed();
>>> +
>>> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>>> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
>>> +  double throughput = consumerThroughput + producerThroughput;
>>> +
>>> +  cout << "Estimated producer throughput (" << producerCount << " threads): "
>>> +       << std::fixed << producerThroughput << " op/s" << endl;
>>> +  cout << "Estimated consumer throughput: "
>>> +       << std::fixed << consumerThroughput << " op/s" << endl;
>>> +  cout << "Estimated total throughput: "
>>> +       << std::fixed << throughput << " op/s" << endl;
>>> +}
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> new file mode 100644
>>> index 0000000..7699974
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> @@ -0,0 +1,104 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +//     http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#include <thread>
>>> +
>>> +#include <stout/gtest.hpp>
>>> +#include <stout/stringify.hpp>
>>> +
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  std::string* s = new std::string("test");
>>> +  q.enqueue(s);
>>> +  std::string* s2 = q.dequeue();
>>> +  ASSERT_EQ(s, s2);
>>> +  delete s2;
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  for (int i = 0; i < 20; i++) {
>>> +    q.enqueue(new std::string(stringify(i)));
>>> +  }
>>> +
>>> +  for (int i = 0; i < 20; i++) {
>>> +    std::string* s = q.dequeue();
>>> +    ASSERT_EQ(*s, stringify(i));
>>> +    delete s;
>>> +  }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  std::vector<std::thread> threads;
>>> +  for (int t = 0; t < 5; t++) {
>>> +    threads.push_back(
>>> +        std::thread([t, &q]() {
>>> +          int start = t * 1000;
>>> +          int end = start + 1000;
>>> +          for (int i = start; i < end; i++) {
>>> +            q.enqueue(new std::string(stringify(i)));
>>> +          }
>>> +        }));
>>> +  }
>>> +
>>> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>>> +    t.join();
>>> +  });
>>> +
>>> +  std::set<std::string> elements;
>>> +
>>> +  std::string* s = nullptr;
>>> +  while ((s = q.dequeue()) != nullptr) {
>>> +    elements.insert(*s);
>>> +  }
>>> +
>>> +  ASSERT_EQ(5000UL, elements.size());
>>> +
>>> +  for (int i = 0; i < 5000; i++) {
>>> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
>>> +  }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, ForEach)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  for (int i = 0; i < 20; i++) {
>>> +    q.enqueue(new std::string(stringify(i)));
>>> +  }
>>> +  int i = 0;
>>> +  q.for_each([&](std::string* s) {
>>> +    ASSERT_EQ(*s, stringify(i++));
>>> +  });
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, Empty)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  ASSERT_TRUE(q.empty());
>>> +  std::string* s = new std::string("test");
>>> +  q.enqueue(s);
>>> +  ASSERT_FALSE(q.empty());
>>> +  q.dequeue();
>>> +  ASSERT_TRUE(q.empty());
>>> +  delete s;
>>> +}
>>> 
>> 
> 


Re: mesos git commit: Added mpsc_linked_queue and use it as the concurrent event queue.

Posted by Dario Rexin <dr...@apple.com>.
Hi Benjamin,

see comments inline.

> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier <be...@mesosphere.io> wrote:
> 
> Hi Dario,
> 
> this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
> 
> 
> I already created a patch for the unused lambda capture,
> 
>    https://reviews.apache.org/r/67927/
> 
> While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.
> 

Yes, I was a bit confused by the MSVC error there. I added the explicit capture because I thought it would be preferable over implicit capture, but I’m fine with either. Thanks for creating the patch!

> 
> I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.
> 
>   private:
>      std::atomic<Node<T>*> head;
> 
>      char padding[128 - sizeof(std::atomic<Node<T>*>)];
> 
>      // TODO(drexin): Programatically get the cache line size.
>      alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.
> 
> Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.
> 

That’s interesting. The padding after tail is a good point, we should definitely add that to prevent false sharing. If we add the padding, is alignas still necessary?

Thanks,
Dario

> 
> Cheers,
> 
> Benjamin
> 
> 
>> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
>> 
>> Repository: mesos
>> Updated Branches:
>> refs/heads/master a11a6a3d8 -> b1eafc035
>> 
>> 
>> Added mpsc_linked_queue and use it as the concurrent event queue.
>> 
>> https://reviews.apache.org/r/62515
>> 
>> 
>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>> 
>> Branch: refs/heads/master
>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>> Parents: a11a6a3
>> Author: Dario Rexin <dr...@apple.com>
>> Authored: Sat Jul 7 13:20:22 2018 -0700
>> Committer: Benjamin Hindman <be...@gmail.com>
>> Committed: Sun Jul 15 09:55:28 2018 -0700
>> 
>> ----------------------------------------------------------------------
>> 3rdparty/libprocess/Makefile.am                 |   1 +
>> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
>> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
>> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
>> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
>> 6 files changed, 367 insertions(+), 150 deletions(-)
>> ----------------------------------------------------------------------
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
>> index 2d356aa..631491a 100644
>> --- a/3rdparty/libprocess/Makefile.am
>> +++ b/3rdparty/libprocess/Makefile.am
>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =					\
>>  src/tests/loop_tests.cpp					\
>>  src/tests/main.cpp						\
>>  src/tests/metrics_tests.cpp					\
>> +  src/tests/mpsc_linked_queue_tests.cpp				\
>>  src/tests/mutex_tests.cpp					\
>>  src/tests/owned_tests.cpp					\
>>  src/tests/process_tests.cpp					\
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
>> index 21c522d..999d552 100644
>> --- a/3rdparty/libprocess/src/event_queue.hpp
>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>> @@ -17,10 +17,6 @@
>> #include <mutex>
>> #include <string>
>> 
>> -#ifdef LOCK_FREE_EVENT_QUEUE
>> -#include <concurrentqueue.h>
>> -#endif // LOCK_FREE_EVENT_QUEUE
>> -
>> #include <process/event.hpp>
>> #include <process/http.hpp>
>> 
>> @@ -28,6 +24,10 @@
>> #include <stout/stringify.hpp>
>> #include <stout/synchronized.hpp>
>> 
>> +#ifdef LOCK_FREE_EVENT_QUEUE
>> +#include "mpsc_linked_queue.hpp"
>> +#endif // LOCK_FREE_EVENT_QUEUE
>> +
>> namespace process {
>> 
>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>> @@ -187,185 +187,55 @@ private:
>> #else // LOCK_FREE_EVENT_QUEUE
>>  void enqueue(Event* event)
>>  {
>> -    Item item = {sequence.fetch_add(1), event};
>>    if (comissioned.load()) {
>> -      queue.enqueue(std::move(item));
>> +      queue.enqueue(event);
>>    } else {
>> -      sequence.fetch_sub(1);
>>      delete event;
>>    }
>>  }
>> 
>>  Event* dequeue()
>>  {
>> -    // NOTE: for performance reasons we don't check `comissioned` here
>> -    // so it's possible that we'll loop forever if a consumer called
>> -    // `decomission()` and then subsequently called `dequeue()`.
>> -    Event* event = nullptr;
>> -    do {
>> -      // Given the nature of the concurrent queue implementation it's
>> -      // possible that we'll need to try to dequeue multiple times
>> -      // until it returns an event even though we know there is an
>> -      // event because the semantics are that we shouldn't call
>> -      // `dequeue()` before calling `empty()`.
>> -      event = try_dequeue();
>> -    } while (event == nullptr);
>> -    return event;
>> +    return queue.dequeue();
>>  }
>> 
>>  bool empty()
>>  {
>> -    // NOTE: for performance reasons we don't check `comissioned` here
>> -    // so it's possible that we'll return true when in fact we've been
>> -    // decomissioned and you shouldn't attempt to dequeue anything.
>> -    return (sequence.load() - next) == 0;
>> +    return queue.empty();
>>  }
>> 
>>  void decomission()
>>  {
>>    comissioned.store(true);
>>    while (!empty()) {
>> -      // NOTE: we use `try_dequeue()` here because we might be racing
>> -      // with `enqueue()` where they've already incremented `sequence`
>> -      // so we think there are more items to dequeue but they aren't
>> -      // actually going to enqueue anything because they've since seen
>> -      // `comissioned` is true. We'll attempt to dequeue with
>> -      // `try_dequeue()` and eventually they'll decrement `sequence`
>> -      // and so `empty()` will return true and we'll bail.
>> -      Event* event = try_dequeue();
>> -      if (event != nullptr) {
>> -        delete event;
>> -      }
>> +      delete dequeue();
>>    }
>>  }
>> 
>>  template <typename T>
>>  size_t count()
>>  {
>> -    // Try and dequeue more elements first!
>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>> -    return std::count_if(
>> -        items.begin(),
>> -        items.end(),
>> -        [](const Item& item) {
>> -          if (item.event != nullptr) {
>> -            return item.event->is<T>();
>> -          }
>> -          return false;
>> -        });
>> +    size_t count = 0;
>> +    queue.for_each([&count](Event* event) {
>> +      if (event->is<T>()) {
>> +        count++;
>> +      }
>> +    });
>> +    return count;
>>  }
>> 
>>  operator JSON::Array()
>>  {
>> -    // Try and dequeue more elements first!
>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>>    JSON::Array array;
>> -    foreach (const Item& item, items) {
>> -      if (item.event != nullptr) {
>> -        array.values.push_back(JSON::Object(*item.event));
>> -      }
>> -    }
>> +    queue.for_each([&array](Event* event) {
>> +      array.values.push_back(JSON::Object(*event));
>> +    });
>> 
>>    return array;
>>  }
>> 
>> -  struct Item
>> -  {
>> -    uint64_t sequence;
>> -    Event* event;
>> -  };
>> -
>> -  Event* try_dequeue()
>> -  {
>> -    // The general algoritm here is as follows: we bulk dequeue as
>> -    // many items from the concurrent queue as possible. We then look
>> -    // for the `next` item in the sequence hoping that it's at the
>> -    // beginning of `items` but because the `queue` is not
>> -    // linearizable it might be "out of order". If we find it out of
>> -    // order we effectively dequeue it but leave it in `items` so as
>> -    // not to incur any costly rearrangements/compactions in
>> -    // `items`. We'll later pop the out of order items once they get
>> -    // to the front.
>> -
>> -    // Start by popping any items that we effectively dequeued but
>> -    // didn't remove from `items` so as not to incur costly
>> -    // rearragements/compactions.
>> -    while (!items.empty() && next > items.front().sequence) {
>> -      items.pop_front();
>> -    }
>> -
>> -    // Optimistically let's hope that the next item is at the front of
>> -    // `item`. If so, pop the item, increment `next`, and return the
>> -    // event.
>> -    if (!items.empty() && items.front().sequence == next) {
>> -      Event* event = items.front().event;
>> -      items.pop_front();
>> -      next += 1;
>> -      return event;
>> -    }
>> -
>> -    size_t index = 0;
>> -
>> -    do {
>> -      // Now look for a potentially out of order item. If found,
>> -      //  signifiy the item has been dequeued by nulling the event
>> -      //  (necessary for the implementation of `count()` and `operator
>> -      //  JSON::Array()`) and return the event.
>> -      for (; index < items.size(); index++) {
>> -        if (items[index].sequence == next) {
>> -          Event* event = items[index].event;
>> -          items[index].event = nullptr;
>> -          next += 1;
>> -          return event;
>> -        }
>> -      }
>> -
>> -      // If we can bulk dequeue more items then keep looking for the
>> -      // out of order event!
>> -      //
>> -      // NOTE: we use the _small_ value of `4` to dequeue here since
>> -      // in the presence of enough events being enqueued we could end
>> -      // up spending a LONG time dequeuing here! Since the next event
>> -      // in the sequence should really be close to the top of the
>> -      // queue we use a small value to dequeue.
>> -      //
>> -      // The intuition here is this: the faster we can return the next
>> -      // event the faster that event can get processed and the faster
>> -      // it might generate other events that can get processed in
>> -      // parallel by other threads and the more work we get done.
>> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>> -
>> -    return nullptr;
>> -  }
>> -
>>  // Underlying queue of items.
>> -  moodycamel::ConcurrentQueue<Item> queue;
>> -
>> -  // Counter to represent the item sequence. Note that we use a
>> -  // unsigned 64-bit integer which means that even if we were adding
>> -  // one item to the queue every nanosecond we'd be able to run for
>> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>> -
>> -  // Counter to represent the next item we expect to dequeue. Note
>> -  // that we don't need to make this be atomic because only a single
>> -  // consumer is ever reading or writing this variable!
>> -  uint64_t next = 0;
>> -
>> -  // Collection of bulk dequeued items that may be out of order. Note
>> -  // that like `next` this will only ever be read/written by a single
>> -  // consumer.
>> -  //
>> -  // The use of a deque was explicit because it is implemented as an
>> -  // array of arrays (or vector of vectors) which usually gives good
>> -  // performance for appending to the back and popping from the front
>> -  // which is exactly what we need to do. To avoid any performance
>> -  // issues that might be incurred we do not remove any items from the
>> -  // middle of the deque (see comments in `try_dequeue()` above for
>> -  // more details).
>> -  std::deque<Item> items;
>> +  MpscLinkedQueue<Event> queue;
>> 
>>  // Whether or not the event queue has been decomissioned. This must
>>  // be atomic as it can be read by a producer even though it's only
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> new file mode 100644
>> index 0000000..48c9509
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> @@ -0,0 +1,179 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +//     http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>> +#define __MPSC_LINKED_QUEUE_HPP__
>> +
>> +#include <atomic>
>> +#include <functional>
>> +
>> +#include <glog/logging.h>
>> +
>> +namespace process {
>> +
>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
>> +// the core methods:
>> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>> +//
>> +// which is a Java port of the MPSC algorithm as presented in following article:
>> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>> +//
>> +// The queue has following properties:
>> +//   Producers are wait-free (one atomic exchange per enqueue)
>> +//   Consumer is
>> +//     - lock-free
>> +//     - mostly wait-free, except when consumer reaches the end of the queue
>> +//       and producer enqueued a new node, but did not update the next pointer
>> +//       on the old node, yet
>> +template <typename T>
>> +class MpscLinkedQueue
>> +{
>> +private:
>> +  template <typename E>
>> +  struct Node
>> +  {
>> +  public:
>> +    explicit Node(E* element = nullptr) : element(element) {}
>> +
>> +    E* element;
>> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>> +  };
>> +
>> +public:
>> +  MpscLinkedQueue()
>> +  {
>> +    tail = new Node<T>();
>> +    head.store(tail);
>> +  }
>> +
>> +  ~MpscLinkedQueue()
>> +  {
>> +    while (auto element = dequeue()) {
>> +      delete element;
>> +    }
>> +
>> +    delete tail;
>> +  }
>> +
>> +  // Multi producer safe.
>> +  void enqueue(T* element)
>> +  {
>> +    // A `nullptr` is used to denote an empty queue when doing a
>> +    // `dequeue()` so producers can't use it as an element.
>> +    CHECK_NOTNULL(element);
>> +
>> +    auto newNode = new Node<T>(element);
>> +
>> +    // Exchange is guaranteed to only give the old value to one
>> +    // producer, so this is safe and wait-free.
>> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
>> +
>> +    // At this point if this thread context switches out we may block
>> +    // the consumer from doing a dequeue (see below). Eventually we'll
>> +    // unblock the consumer once we run again and execute the next
>> +    // line of code.
>> +    oldhead->next.store(newNode, std::memory_order_release);
>> +  }
>> +
>> +  // Single consumer only.
>> +  T* dequeue()
>> +  {
>> +    auto currentTail = tail;
>> +
>> +    // Check and see if there is an actual element linked from `tail`
>> +    // since we use `tail` as a "stub" rather than the actual element.
>> +    auto nextTail = currentTail->next.exchange(
>> +        nullptr,
>> +        std::memory_order_relaxed);
>> +
>> +    // There are three possible cases here:
>> +    //
>> +    // (1) The queue is empty.
>> +    // (2) The queue appears empty but a producer is still enqueuing
>> +    //     so let's wait for it and then dequeue.
>> +    // (3) We have something to dequeue.
>> +    //
>> +    // Start by checking if the queue is or appears empty.
>> +    if (nextTail == nullptr) {
>> +      // Now check if the queue is actually empty or just appears
>> +      // empty. If it's actually empty then return `nullptr` to denote
>> +      // emptiness.
>> +      if (head.load(std::memory_order_relaxed) == tail) {
>> +        return nullptr;
>> +      }
>> +
>> +      // Another thread already inserted a new node, but did not
>> +      // connect it to the tail, yet, so we spin-wait. At this point
>> +      // we are not wait-free anymore.
>> +      do {
>> +        nextTail = currentTail->next.exchange(
>> +            nullptr,
>> +            std::memory_order_relaxed);
>> +      } while (nextTail == nullptr);
>> +    }
>> +
>> +    CHECK_NOTNULL(nextTail);
>> +
>> +    auto element = nextTail->element;
>> +    nextTail->element = nullptr;
>> +
>> +    tail = nextTail;
>> +    delete currentTail;
>> +
>> +    return element;
>> +  }
>> +
>> +  // Single consumer only.
>> +  //
>> +  // TODO(drexin): Provide C++ style iteration so someone can just use
>> +  // the `std::for_each()`.
>> +  template <typename F>
>> +  void for_each(F&& f)
>> +  {
>> +    auto end = head.load();
>> +    auto node = tail;
>> +
>> +    for (;;) {
>> +      node = node->next.load();
>> +
>> +      // We are following the linked structure until we reach the end
>> +      // node. There is a race with new nodes being added, so we limit
>> +      // the traversal to the last node at the time we started.
>> +      if (node == nullptr) {
>> +        return;
>> +      }
>> +
>> +      f(node->element);
>> +
>> +      if (node == end) {
>> +        return;
>> +      }
>> +    }
>> +  }
>> +
>> +  // Single consumer only.
>> +  bool empty()
>> +  {
>> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
>> +      head.load(std::memory_order_relaxed) == tail;
>> +  }
>> +
>> +private:
>> +  std::atomic<Node<T>*> head;
>> +
>> +  // TODO(drexin): Programatically get the cache line size.
>> +  alignas(128) Node<T>* tail;
>> +};
>> +
>> +} // namespace process {
>> +
>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> index 25a34f9..5814bc6 100644
>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>>  limiter_tests.cpp
>>  loop_tests.cpp
>>  metrics_tests.cpp
>> +  mpsc_linked_queue_tests.cpp
>>  mutex_tests.cpp
>>  owned_tests.cpp
>>  process_tests.cpp
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> index 2ec0d42..e8ef21f 100644
>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> @@ -22,6 +22,7 @@
>> #include <iostream>
>> #include <memory>
>> #include <string>
>> +#include <thread>
>> #include <vector>
>> 
>> #include <process/collect.hpp>
>> @@ -40,6 +41,8 @@
>> 
>> #include "benchmarks.pb.h"
>> 
>> +#include "mpsc_linked_queue.hpp"
>> +
>> namespace http = process::http;
>> 
>> using process::CountDownLatch;
>> @@ -567,7 +570,6 @@ private:
>>  long count = 0;
>> };
>> 
>> -
>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>> {
>>  constexpr long repeats = 100000;
>> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
>>    process.run(num_submessages);
>>  }
>> }
>> +
>> +
>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>> +{
>> +  // NOTE: we set the total number of producers to be 1 less than the
>> +  // hardware concurrency so the consumer doesn't have to fight for
>> +  // processing time with the producers.
>> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
>> +  const int messageCount = 10000000;
>> +  const int totalCount = messageCount * producerCount;
>> +  std::string* s = new std::string("");
>> +  process::MpscLinkedQueue<std::string> q;
>> +
>> +  Stopwatch consumerWatch;
>> +
>> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>> +    consumerWatch.start();
>> +    for (int i = totalCount; i > 0;) {
>> +      if (q.dequeue() != nullptr) {
>> +        i--;
>> +      }
>> +    }
>> +    consumerWatch.stop();
>> +  });
>> +
>> +  std::vector<std::thread> producers;
>> +
>> +  Stopwatch producerWatch;
>> +  producerWatch.start();
>> +
>> +  for (unsigned int t = 0; t < producerCount; t++) {
>> +    producers.push_back(std::thread([messageCount, s, &q]() {
>> +      for (int i = 0; i < messageCount; i++) {
>> +        q.enqueue(s);
>> +      }
>> +    }));
>> +  }
>> +
>> +  for (std::thread& producer : producers) {
>> +    producer.join();
>> +  }
>> +
>> +  producerWatch.stop();
>> +
>> +  consumer.join();
>> +
>> +  Duration producerElapsed = producerWatch.elapsed();
>> +  Duration consumerElapsed = consumerWatch.elapsed();
>> +
>> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
>> +  double throughput = consumerThroughput + producerThroughput;
>> +
>> +  cout << "Estimated producer throughput (" << producerCount << " threads): "
>> +       << std::fixed << producerThroughput << " op/s" << endl;
>> +  cout << "Estimated consumer throughput: "
>> +       << std::fixed << consumerThroughput << " op/s" << endl;
>> +  cout << "Estimated total throughput: "
>> +       << std::fixed << throughput << " op/s" << endl;
>> +}
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> new file mode 100644
>> index 0000000..7699974
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> @@ -0,0 +1,104 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +//     http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#include <thread>
>> +
>> +#include <stout/gtest.hpp>
>> +#include <stout/stringify.hpp>
>> +
>> +#include "mpsc_linked_queue.hpp"
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  std::string* s = new std::string("test");
>> +  q.enqueue(s);
>> +  std::string* s2 = q.dequeue();
>> +  ASSERT_EQ(s, s2);
>> +  delete s2;
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  for (int i = 0; i < 20; i++) {
>> +    q.enqueue(new std::string(stringify(i)));
>> +  }
>> +
>> +  for (int i = 0; i < 20; i++) {
>> +    std::string* s = q.dequeue();
>> +    ASSERT_EQ(*s, stringify(i));
>> +    delete s;
>> +  }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  std::vector<std::thread> threads;
>> +  for (int t = 0; t < 5; t++) {
>> +    threads.push_back(
>> +        std::thread([t, &q]() {
>> +          int start = t * 1000;
>> +          int end = start + 1000;
>> +          for (int i = start; i < end; i++) {
>> +            q.enqueue(new std::string(stringify(i)));
>> +          }
>> +        }));
>> +  }
>> +
>> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>> +    t.join();
>> +  });
>> +
>> +  std::set<std::string> elements;
>> +
>> +  std::string* s = nullptr;
>> +  while ((s = q.dequeue()) != nullptr) {
>> +    elements.insert(*s);
>> +  }
>> +
>> +  ASSERT_EQ(5000UL, elements.size());
>> +
>> +  for (int i = 0; i < 5000; i++) {
>> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
>> +  }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, ForEach)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  for (int i = 0; i < 20; i++) {
>> +    q.enqueue(new std::string(stringify(i)));
>> +  }
>> +  int i = 0;
>> +  q.for_each([&](std::string* s) {
>> +    ASSERT_EQ(*s, stringify(i++));
>> +  });
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, Empty)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  ASSERT_TRUE(q.empty());
>> +  std::string* s = new std::string("test");
>> +  q.enqueue(s);
>> +  ASSERT_FALSE(q.empty());
>> +  q.dequeue();
>> +  ASSERT_TRUE(q.empty());
>> +  delete s;
>> +}
>> 
> 


Re: mesos git commit: Added mpsc_linked_queue and use it as the concurrent event queue.

Posted by Benjamin Bannier <be...@mesosphere.io>.
Hi Dario,

this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?


I already created a patch for the unused lambda capture,

    https://reviews.apache.org/r/67927/

While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them.


I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`.

   private:
      std::atomic<Node<T>*> head;

      char padding[128 - sizeof(std::atomic<Node<T>*>)];

      // TODO(drexin): Programatically get the cache line size.
      alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`.

Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`.


Cheers,

Benjamin


> On Jul 15, 2018, at 7:02 PM, benh@apache.org wrote:
> 
> Repository: mesos
> Updated Branches:
>  refs/heads/master a11a6a3d8 -> b1eafc035
> 
> 
> Added mpsc_linked_queue and use it as the concurrent event queue.
> 
> https://reviews.apache.org/r/62515
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
> 
> Branch: refs/heads/master
> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
> Parents: a11a6a3
> Author: Dario Rexin <dr...@apple.com>
> Authored: Sat Jul 7 13:20:22 2018 -0700
> Committer: Benjamin Hindman <be...@gmail.com>
> Committed: Sun Jul 15 09:55:28 2018 -0700
> 
> ----------------------------------------------------------------------
> 3rdparty/libprocess/Makefile.am                 |   1 +
> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
> 6 files changed, 367 insertions(+), 150 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
> index 2d356aa..631491a 100644
> --- a/3rdparty/libprocess/Makefile.am
> +++ b/3rdparty/libprocess/Makefile.am
> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =					\
>   src/tests/loop_tests.cpp					\
>   src/tests/main.cpp						\
>   src/tests/metrics_tests.cpp					\
> +  src/tests/mpsc_linked_queue_tests.cpp				\
>   src/tests/mutex_tests.cpp					\
>   src/tests/owned_tests.cpp					\
>   src/tests/process_tests.cpp					\
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/event_queue.hpp b/3rdparty/libprocess/src/event_queue.hpp
> index 21c522d..999d552 100644
> --- a/3rdparty/libprocess/src/event_queue.hpp
> +++ b/3rdparty/libprocess/src/event_queue.hpp
> @@ -17,10 +17,6 @@
> #include <mutex>
> #include <string>
> 
> -#ifdef LOCK_FREE_EVENT_QUEUE
> -#include <concurrentqueue.h>
> -#endif // LOCK_FREE_EVENT_QUEUE
> -
> #include <process/event.hpp>
> #include <process/http.hpp>
> 
> @@ -28,6 +24,10 @@
> #include <stout/stringify.hpp>
> #include <stout/synchronized.hpp>
> 
> +#ifdef LOCK_FREE_EVENT_QUEUE
> +#include "mpsc_linked_queue.hpp"
> +#endif // LOCK_FREE_EVENT_QUEUE
> +
> namespace process {
> 
> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
> @@ -187,185 +187,55 @@ private:
> #else // LOCK_FREE_EVENT_QUEUE
>   void enqueue(Event* event)
>   {
> -    Item item = {sequence.fetch_add(1), event};
>     if (comissioned.load()) {
> -      queue.enqueue(std::move(item));
> +      queue.enqueue(event);
>     } else {
> -      sequence.fetch_sub(1);
>       delete event;
>     }
>   }
> 
>   Event* dequeue()
>   {
> -    // NOTE: for performance reasons we don't check `comissioned` here
> -    // so it's possible that we'll loop forever if a consumer called
> -    // `decomission()` and then subsequently called `dequeue()`.
> -    Event* event = nullptr;
> -    do {
> -      // Given the nature of the concurrent queue implementation it's
> -      // possible that we'll need to try to dequeue multiple times
> -      // until it returns an event even though we know there is an
> -      // event because the semantics are that we shouldn't call
> -      // `dequeue()` before calling `empty()`.
> -      event = try_dequeue();
> -    } while (event == nullptr);
> -    return event;
> +    return queue.dequeue();
>   }
> 
>   bool empty()
>   {
> -    // NOTE: for performance reasons we don't check `comissioned` here
> -    // so it's possible that we'll return true when in fact we've been
> -    // decomissioned and you shouldn't attempt to dequeue anything.
> -    return (sequence.load() - next) == 0;
> +    return queue.empty();
>   }
> 
>   void decomission()
>   {
>     comissioned.store(true);
>     while (!empty()) {
> -      // NOTE: we use `try_dequeue()` here because we might be racing
> -      // with `enqueue()` where they've already incremented `sequence`
> -      // so we think there are more items to dequeue but they aren't
> -      // actually going to enqueue anything because they've since seen
> -      // `comissioned` is true. We'll attempt to dequeue with
> -      // `try_dequeue()` and eventually they'll decrement `sequence`
> -      // and so `empty()` will return true and we'll bail.
> -      Event* event = try_dequeue();
> -      if (event != nullptr) {
> -        delete event;
> -      }
> +      delete dequeue();
>     }
>   }
> 
>   template <typename T>
>   size_t count()
>   {
> -    // Try and dequeue more elements first!
> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> -    return std::count_if(
> -        items.begin(),
> -        items.end(),
> -        [](const Item& item) {
> -          if (item.event != nullptr) {
> -            return item.event->is<T>();
> -          }
> -          return false;
> -        });
> +    size_t count = 0;
> +    queue.for_each([&count](Event* event) {
> +      if (event->is<T>()) {
> +        count++;
> +      }
> +    });
> +    return count;
>   }
> 
>   operator JSON::Array()
>   {
> -    // Try and dequeue more elements first!
> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
>     JSON::Array array;
> -    foreach (const Item& item, items) {
> -      if (item.event != nullptr) {
> -        array.values.push_back(JSON::Object(*item.event));
> -      }
> -    }
> +    queue.for_each([&array](Event* event) {
> +      array.values.push_back(JSON::Object(*event));
> +    });
> 
>     return array;
>   }
> 
> -  struct Item
> -  {
> -    uint64_t sequence;
> -    Event* event;
> -  };
> -
> -  Event* try_dequeue()
> -  {
> -    // The general algoritm here is as follows: we bulk dequeue as
> -    // many items from the concurrent queue as possible. We then look
> -    // for the `next` item in the sequence hoping that it's at the
> -    // beginning of `items` but because the `queue` is not
> -    // linearizable it might be "out of order". If we find it out of
> -    // order we effectively dequeue it but leave it in `items` so as
> -    // not to incur any costly rearrangements/compactions in
> -    // `items`. We'll later pop the out of order items once they get
> -    // to the front.
> -
> -    // Start by popping any items that we effectively dequeued but
> -    // didn't remove from `items` so as not to incur costly
> -    // rearragements/compactions.
> -    while (!items.empty() && next > items.front().sequence) {
> -      items.pop_front();
> -    }
> -
> -    // Optimistically let's hope that the next item is at the front of
> -    // `item`. If so, pop the item, increment `next`, and return the
> -    // event.
> -    if (!items.empty() && items.front().sequence == next) {
> -      Event* event = items.front().event;
> -      items.pop_front();
> -      next += 1;
> -      return event;
> -    }
> -
> -    size_t index = 0;
> -
> -    do {
> -      // Now look for a potentially out of order item. If found,
> -      //  signifiy the item has been dequeued by nulling the event
> -      //  (necessary for the implementation of `count()` and `operator
> -      //  JSON::Array()`) and return the event.
> -      for (; index < items.size(); index++) {
> -        if (items[index].sequence == next) {
> -          Event* event = items[index].event;
> -          items[index].event = nullptr;
> -          next += 1;
> -          return event;
> -        }
> -      }
> -
> -      // If we can bulk dequeue more items then keep looking for the
> -      // out of order event!
> -      //
> -      // NOTE: we use the _small_ value of `4` to dequeue here since
> -      // in the presence of enough events being enqueued we could end
> -      // up spending a LONG time dequeuing here! Since the next event
> -      // in the sequence should really be close to the top of the
> -      // queue we use a small value to dequeue.
> -      //
> -      // The intuition here is this: the faster we can return the next
> -      // event the faster that event can get processed and the faster
> -      // it might generate other events that can get processed in
> -      // parallel by other threads and the more work we get done.
> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
> -
> -    return nullptr;
> -  }
> -
>   // Underlying queue of items.
> -  moodycamel::ConcurrentQueue<Item> queue;
> -
> -  // Counter to represent the item sequence. Note that we use a
> -  // unsigned 64-bit integer which means that even if we were adding
> -  // one item to the queue every nanosecond we'd be able to run for
> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
> -
> -  // Counter to represent the next item we expect to dequeue. Note
> -  // that we don't need to make this be atomic because only a single
> -  // consumer is ever reading or writing this variable!
> -  uint64_t next = 0;
> -
> -  // Collection of bulk dequeued items that may be out of order. Note
> -  // that like `next` this will only ever be read/written by a single
> -  // consumer.
> -  //
> -  // The use of a deque was explicit because it is implemented as an
> -  // array of arrays (or vector of vectors) which usually gives good
> -  // performance for appending to the back and popping from the front
> -  // which is exactly what we need to do. To avoid any performance
> -  // issues that might be incurred we do not remove any items from the
> -  // middle of the deque (see comments in `try_dequeue()` above for
> -  // more details).
> -  std::deque<Item> items;
> +  MpscLinkedQueue<Event> queue;
> 
>   // Whether or not the event queue has been decomissioned. This must
>   // be atomic as it can be read by a producer even though it's only
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> new file mode 100644
> index 0000000..48c9509
> --- /dev/null
> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> @@ -0,0 +1,179 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +//     http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#ifndef __MPSC_LINKED_QUEUE_HPP__
> +#define __MPSC_LINKED_QUEUE_HPP__
> +
> +#include <atomic>
> +#include <functional>
> +
> +#include <glog/logging.h>
> +
> +namespace process {
> +
> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
> +// the core methods:
> +// https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
> +//
> +// which is a Java port of the MPSC algorithm as presented in following article:
> +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
> +//
> +// The queue has following properties:
> +//   Producers are wait-free (one atomic exchange per enqueue)
> +//   Consumer is
> +//     - lock-free
> +//     - mostly wait-free, except when consumer reaches the end of the queue
> +//       and producer enqueued a new node, but did not update the next pointer
> +//       on the old node, yet
> +template <typename T>
> +class MpscLinkedQueue
> +{
> +private:
> +  template <typename E>
> +  struct Node
> +  {
> +  public:
> +    explicit Node(E* element = nullptr) : element(element) {}
> +
> +    E* element;
> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
> +  };
> +
> +public:
> +  MpscLinkedQueue()
> +  {
> +    tail = new Node<T>();
> +    head.store(tail);
> +  }
> +
> +  ~MpscLinkedQueue()
> +  {
> +    while (auto element = dequeue()) {
> +      delete element;
> +    }
> +
> +    delete tail;
> +  }
> +
> +  // Multi producer safe.
> +  void enqueue(T* element)
> +  {
> +    // A `nullptr` is used to denote an empty queue when doing a
> +    // `dequeue()` so producers can't use it as an element.
> +    CHECK_NOTNULL(element);
> +
> +    auto newNode = new Node<T>(element);
> +
> +    // Exchange is guaranteed to only give the old value to one
> +    // producer, so this is safe and wait-free.
> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
> +
> +    // At this point if this thread context switches out we may block
> +    // the consumer from doing a dequeue (see below). Eventually we'll
> +    // unblock the consumer once we run again and execute the next
> +    // line of code.
> +    oldhead->next.store(newNode, std::memory_order_release);
> +  }
> +
> +  // Single consumer only.
> +  T* dequeue()
> +  {
> +    auto currentTail = tail;
> +
> +    // Check and see if there is an actual element linked from `tail`
> +    // since we use `tail` as a "stub" rather than the actual element.
> +    auto nextTail = currentTail->next.exchange(
> +        nullptr,
> +        std::memory_order_relaxed);
> +
> +    // There are three possible cases here:
> +    //
> +    // (1) The queue is empty.
> +    // (2) The queue appears empty but a producer is still enqueuing
> +    //     so let's wait for it and then dequeue.
> +    // (3) We have something to dequeue.
> +    //
> +    // Start by checking if the queue is or appears empty.
> +    if (nextTail == nullptr) {
> +      // Now check if the queue is actually empty or just appears
> +      // empty. If it's actually empty then return `nullptr` to denote
> +      // emptiness.
> +      if (head.load(std::memory_order_relaxed) == tail) {
> +        return nullptr;
> +      }
> +
> +      // Another thread already inserted a new node, but did not
> +      // connect it to the tail, yet, so we spin-wait. At this point
> +      // we are not wait-free anymore.
> +      do {
> +        nextTail = currentTail->next.exchange(
> +            nullptr,
> +            std::memory_order_relaxed);
> +      } while (nextTail == nullptr);
> +    }
> +
> +    CHECK_NOTNULL(nextTail);
> +
> +    auto element = nextTail->element;
> +    nextTail->element = nullptr;
> +
> +    tail = nextTail;
> +    delete currentTail;
> +
> +    return element;
> +  }
> +
> +  // Single consumer only.
> +  //
> +  // TODO(drexin): Provide C++ style iteration so someone can just use
> +  // the `std::for_each()`.
> +  template <typename F>
> +  void for_each(F&& f)
> +  {
> +    auto end = head.load();
> +    auto node = tail;
> +
> +    for (;;) {
> +      node = node->next.load();
> +
> +      // We are following the linked structure until we reach the end
> +      // node. There is a race with new nodes being added, so we limit
> +      // the traversal to the last node at the time we started.
> +      if (node == nullptr) {
> +        return;
> +      }
> +
> +      f(node->element);
> +
> +      if (node == end) {
> +        return;
> +      }
> +    }
> +  }
> +
> +  // Single consumer only.
> +  bool empty()
> +  {
> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
> +      head.load(std::memory_order_relaxed) == tail;
> +  }
> +
> +private:
> +  std::atomic<Node<T>*> head;
> +
> +  // TODO(drexin): Programatically get the cache line size.
> +  alignas(128) Node<T>* tail;
> +};
> +
> +} // namespace process {
> +
> +#endif // __MPSC_LINKED_QUEUE_HPP__
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
> index 25a34f9..5814bc6 100644
> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>   limiter_tests.cpp
>   loop_tests.cpp
>   metrics_tests.cpp
> +  mpsc_linked_queue_tests.cpp
>   mutex_tests.cpp
>   owned_tests.cpp
>   process_tests.cpp
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
> index 2ec0d42..e8ef21f 100644
> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> @@ -22,6 +22,7 @@
> #include <iostream>
> #include <memory>
> #include <string>
> +#include <thread>
> #include <vector>
> 
> #include <process/collect.hpp>
> @@ -40,6 +41,8 @@
> 
> #include "benchmarks.pb.h"
> 
> +#include "mpsc_linked_queue.hpp"
> +
> namespace http = process::http;
> 
> using process::CountDownLatch;
> @@ -567,7 +570,6 @@ private:
>   long count = 0;
> };
> 
> -
> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
> {
>   constexpr long repeats = 100000;
> @@ -683,3 +685,63 @@ TEST(ProcessTest, Process_BENCHMARK_ProtobufInstallHandler)
>     process.run(num_submessages);
>   }
> }
> +
> +
> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
> +{
> +  // NOTE: we set the total number of producers to be 1 less than the
> +  // hardware concurrency so the consumer doesn't have to fight for
> +  // processing time with the producers.
> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
> +  const int messageCount = 10000000;
> +  const int totalCount = messageCount * producerCount;
> +  std::string* s = new std::string("");
> +  process::MpscLinkedQueue<std::string> q;
> +
> +  Stopwatch consumerWatch;
> +
> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
> +    consumerWatch.start();
> +    for (int i = totalCount; i > 0;) {
> +      if (q.dequeue() != nullptr) {
> +        i--;
> +      }
> +    }
> +    consumerWatch.stop();
> +  });
> +
> +  std::vector<std::thread> producers;
> +
> +  Stopwatch producerWatch;
> +  producerWatch.start();
> +
> +  for (unsigned int t = 0; t < producerCount; t++) {
> +    producers.push_back(std::thread([messageCount, s, &q]() {
> +      for (int i = 0; i < messageCount; i++) {
> +        q.enqueue(s);
> +      }
> +    }));
> +  }
> +
> +  for (std::thread& producer : producers) {
> +    producer.join();
> +  }
> +
> +  producerWatch.stop();
> +
> +  consumer.join();
> +
> +  Duration producerElapsed = producerWatch.elapsed();
> +  Duration consumerElapsed = consumerWatch.elapsed();
> +
> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
> +  double throughput = consumerThroughput + producerThroughput;
> +
> +  cout << "Estimated producer throughput (" << producerCount << " threads): "
> +       << std::fixed << producerThroughput << " op/s" << endl;
> +  cout << "Estimated consumer throughput: "
> +       << std::fixed << consumerThroughput << " op/s" << endl;
> +  cout << "Estimated total throughput: "
> +       << std::fixed << throughput << " op/s" << endl;
> +}
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> new file mode 100644
> index 0000000..7699974
> --- /dev/null
> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> @@ -0,0 +1,104 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +//     http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#include <thread>
> +
> +#include <stout/gtest.hpp>
> +#include <stout/stringify.hpp>
> +
> +#include "mpsc_linked_queue.hpp"
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  std::string* s = new std::string("test");
> +  q.enqueue(s);
> +  std::string* s2 = q.dequeue();
> +  ASSERT_EQ(s, s2);
> +  delete s2;
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  for (int i = 0; i < 20; i++) {
> +    q.enqueue(new std::string(stringify(i)));
> +  }
> +
> +  for (int i = 0; i < 20; i++) {
> +    std::string* s = q.dequeue();
> +    ASSERT_EQ(*s, stringify(i));
> +    delete s;
> +  }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  std::vector<std::thread> threads;
> +  for (int t = 0; t < 5; t++) {
> +    threads.push_back(
> +        std::thread([t, &q]() {
> +          int start = t * 1000;
> +          int end = start + 1000;
> +          for (int i = start; i < end; i++) {
> +            q.enqueue(new std::string(stringify(i)));
> +          }
> +        }));
> +  }
> +
> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
> +    t.join();
> +  });
> +
> +  std::set<std::string> elements;
> +
> +  std::string* s = nullptr;
> +  while ((s = q.dequeue()) != nullptr) {
> +    elements.insert(*s);
> +  }
> +
> +  ASSERT_EQ(5000UL, elements.size());
> +
> +  for (int i = 0; i < 5000; i++) {
> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
> +  }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, ForEach)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  for (int i = 0; i < 20; i++) {
> +    q.enqueue(new std::string(stringify(i)));
> +  }
> +  int i = 0;
> +  q.for_each([&](std::string* s) {
> +    ASSERT_EQ(*s, stringify(i++));
> +  });
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, Empty)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  ASSERT_TRUE(q.empty());
> +  std::string* s = new std::string("test");
> +  q.enqueue(s);
> +  ASSERT_FALSE(q.empty());
> +  q.dequeue();
> +  ASSERT_TRUE(q.empty());
> +  delete s;
> +}
>