You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2018/08/07 14:47:41 UTC

[mesos] branch master updated: Replaced exchange in MpscLinkedQueue::dequeue with load/store.

This is an automated email from the ASF dual-hosted git repository.

jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 57f22aa  Replaced exchange in MpscLinkedQueue::dequeue with load/store.
57f22aa is described below

commit 57f22aad4ce12c607a2547f9801c70eea595cc90
Author: Dario Rexin <dr...@apple.com>
AuthorDate: Mon Aug 6 13:29:08 2018 -0700

    Replaced exchange in MpscLinkedQueue::dequeue with load/store.
    
    There's only a single consumer, so we don't need to exchange
    in dequeue. Testing shows that a load/store pair is much
    more efficient, as it needs fewer CPU cycles. Especially
    in the empty queue case, where the store does not have to
    be executed at all.
    
    Review: https://reviews.apache.org/r/68149/
---
 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 14 ++---
 3rdparty/libprocess/src/tests/benchmarks.cpp  | 79 +++++++++++++++++++++++++++
 2 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
index fc55d43..183e619 100644
--- a/3rdparty/libprocess/src/mpsc_linked_queue.hpp
+++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
@@ -75,7 +75,7 @@ public:
 
     // Exchange is guaranteed to only give the old value to one
     // producer, so this is safe and wait-free.
-    auto oldhead = head.exchange(newNode, std::memory_order_release);
+    auto oldhead = head.exchange(newNode, std::memory_order_acq_rel);
 
     // At this point if this thread context switches out we may block
     // the consumer from doing a dequeue (see below). Eventually we'll
@@ -91,9 +91,7 @@ public:
 
     // Check and see if there is an actual element linked from `tail`
     // since we use `tail` as a "stub" rather than the actual element.
-    auto nextTail = currentTail->next.exchange(
-        nullptr,
-        std::memory_order_relaxed);
+    auto nextTail = currentTail->next.load(std::memory_order_acquire);
 
     // There are three possible cases here:
     //
@@ -115,14 +113,16 @@ public:
       // connect it to the tail, yet, so we spin-wait. At this point
       // we are not wait-free anymore.
       do {
-        nextTail = currentTail->next.exchange(
-            nullptr,
-            std::memory_order_relaxed);
+        nextTail = currentTail->next.load(std::memory_order_acquire);
       } while (nextTail == nullptr);
     }
 
     CHECK_NOTNULL(nextTail);
 
+    // Set next pointer of current tail to null to disconnect it
+    // from the queue.
+    currentTail->next.store(nullptr, std::memory_order_release);
+
     auto element = nextTail->element;
     nextTail->element = nullptr;
 
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 0b94aaf..16b3bca 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -837,3 +837,82 @@ TEST_P(Metrics_BENCHMARK_Test, Scalability)
   std::cout << "Removed " << metrics_count << " counters in "
             << watch.elapsed() << std::endl;
 }
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueueEmpty)
+{
+  const int messageCount = 1000000000;
+  process::MpscLinkedQueue<std::string> q;
+
+  Stopwatch consumerWatch;
+  consumerWatch.start();
+
+  for (int i = messageCount; i > 0; i--) {
+    q.dequeue();
+  }
+
+  consumerWatch.stop();
+
+  Duration consumerElapsed = consumerWatch.elapsed();
+
+  double consumerThroughput = messageCount / consumerElapsed.secs();
+
+  cout << "Estimated consumer throughput: "
+       << std::fixed << consumerThroughput << " op/s" << endl;
+}
+
+
+TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueueNonContendedRead)
+{
+  // NOTE: we set the total number of producers to be 1 less than the
+  // hardware concurrency so the consumer doesn't have to fight for
+  // processing time with the producers.
+  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
+  const int messageCount = 10000000;
+  const int totalCount = messageCount * producerCount;
+  std::string* s = new std::string("");
+  process::MpscLinkedQueue<std::string> q;
+
+  std::vector<std::thread> producers;
+  for (unsigned int t = 0; t < producerCount; t++) {
+    producers.push_back(std::thread([s, &q]() {
+      for (int i = 0; i < messageCount; i++) {
+        q.enqueue(s);
+      }
+    }));
+  }
+
+  Stopwatch producerWatch;
+  producerWatch.start();
+
+  for (std::thread& producer : producers) {
+    producer.join();
+  }
+
+  producerWatch.stop();
+
+  Stopwatch consumerWatch;
+  consumerWatch.start();
+
+  for (int i = totalCount; i > 0;) {
+    if (q.dequeue() != nullptr) {
+      i--;
+    }
+  }
+
+  consumerWatch.stop();
+
+  Duration producerElapsed = producerWatch.elapsed();
+  Duration consumerElapsed = consumerWatch.elapsed();
+
+  double consumerThroughput = totalCount / consumerElapsed.secs();
+  double producerThroughput = totalCount / producerElapsed.secs();
+  double throughput = consumerThroughput + producerThroughput;
+
+  cout << "Estimated producer throughput (" << producerCount << " threads): "
+       << std::fixed << producerThroughput << " op/s" << endl;
+  cout << "Estimated consumer throughput: "
+       << std::fixed << consumerThroughput << " op/s" << endl;
+  cout << "Estimated total throughput: "
+       << std::fixed << throughput << " op/s" << endl;
+}