You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/20 16:11:15 UTC

[1/2] mesos git commit: Added a message passing benchmark.

Repository: mesos
Updated Branches:
  refs/heads/master ef6622589 -> 818aa53ac


Added a message passing benchmark.

Review: https://reviews.apache.org/r/60983


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7c90f186
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7c90f186
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7c90f186

Branch: refs/heads/master
Commit: 7c90f18603223474652722c78e46b0aa65a528e5
Parents: ef66225
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 19 13:56:28 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jul 20 09:10:01 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 3rdparty/libprocess/include/Makefile.am         |   1 +
 .../include/process/count_down_latch.hpp        |  54 +++++++++
 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
 3rdparty/libprocess/src/tests/benchmarks.cpp    | 116 +++++++++++++++++++
 .../src/tests/count_down_latch_tests.cpp        |  70 +++++++++++
 6 files changed, 243 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4031297..378a434 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -249,6 +249,7 @@ check_PROGRAMS = libprocess-tests benchmarks
 libprocess_tests_SOURCES =					\
   src/tests/after_tests.cpp					\
   src/tests/collect_tests.cpp					\
+  src/tests/count_down_latch_tests.cpp				\
   src/tests/decoder_tests.cpp					\
   src/tests/encoder_tests.cpp					\
   src/tests/future_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index b452053..637654e 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -19,6 +19,7 @@ nobase_include_HEADERS =		\
   process/check.hpp			\
   process/clock.hpp			\
   process/collect.hpp			\
+  process/count_down_latch.hpp		\
   process/defer.hpp			\
   process/deferred.hpp			\
   process/delay.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/include/process/count_down_latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/count_down_latch.hpp b/3rdparty/libprocess/include/process/count_down_latch.hpp
new file mode 100644
index 0000000..0b6ac00
--- /dev/null
+++ b/3rdparty/libprocess/include/process/count_down_latch.hpp
@@ -0,0 +1,54 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_COUNT_DOWN_LATCH_HPP__
+#define __PROCESS_COUNT_DOWN_LATCH_HPP__
+
+#include <atomic>
+
+#include <process/future.hpp>
+
+namespace process {
+
+// An implementation of a count down latch that returns a Future to
+// signify when it gets triggered.
+class CountDownLatch
+{
+public:
+  CountDownLatch(size_t count = 1) : count(count) {}
+
+  void decrement()
+  {
+    size_t expected = count.load();
+    while (expected > 0) {
+      if (count.compare_exchange_strong(expected, expected - 1)) {
+        if (expected == 1) {
+          promise.set(Nothing());
+        }
+        break;
+      }
+    }
+  }
+
+  Future<Nothing> triggered()
+  {
+    return promise.future();
+  }
+
+private:
+  std::atomic<size_t> count;
+  Promise<Nothing> promise;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_COUNT_DOWN_LATCH_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index 3fc8d98..27451c2 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -21,6 +21,7 @@ set(PROCESS_TESTS_SRC
   main.cpp
   after_tests.cpp
   collect_tests.cpp
+  count_down_latch_tests.cpp
   decoder_tests.cpp
   encoder_tests.cpp
   future_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 8e8f426..1ef8c4b 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include <process/collect.hpp>
+#include <process/count_down_latch.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -37,7 +38,9 @@
 
 namespace http = process::http;
 
+using process::CountDownLatch;
 using process::Future;
+using process::MessageEvent;
 using process::Owned;
 using process::Process;
 using process::ProcessBase;
@@ -369,3 +372,116 @@ TEST(ProcessTest, Process_BENCHMARK_LargeNumberOfLinks)
     delete process;
   }
 }
+
+
+class Destination : public Process<Destination>
+{
+protected:
+  virtual void visit(const MessageEvent& event)
+  {
+    if (event.message.name == "ping") {
+      send(event.message.from, "pong");
+    }
+  }
+};
+
+
+class Client : public Process<Client>
+{
+public:
+  Client(const UPID& destination, CountDownLatch* latch, long repeat)
+    : destination(destination), latch(latch), repeat(repeat) {}
+
+protected:
+  virtual void visit(const MessageEvent& event)
+  {
+    if (event.message.name == "pong") {
+      received += 1;
+      if (sent < repeat) {
+        send(destination, "ping");
+        sent += 1;
+      } else if (received >= repeat) {
+        latch->decrement();
+      }
+    } else if (event.message.name == "run") {
+      for (long l = 0; l < std::min(1000L, repeat); l++) {
+        send(destination, "ping");
+        sent += 1;
+      }
+    }
+  }
+
+private:
+  UPID destination;
+  CountDownLatch* latch;
+  long repeat;
+  long sent = 0L;
+  long received = 0L;
+};
+
+
+// See
+// https://github.com/akka/akka/blob/7ac37e7536547c57ab639ed8746c7b4e5ff2f69b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala
+// for the inspiration for this benchmark (this file was deleted in
+// this commit:
+// https://github.com/akka/akka/commit/a02e138f3bc7c21c2b2511ea19203a52d74584d5).
+//
+// This benchmark was discussed here:
+// http://letitcrash.com/post/17607272336/scalability-of-fork-join-pool
+TEST(ProcessTest, Process_BENCHMARK_ThroughputPerformance)
+{
+  long repeatFactor = 500L;
+  long defaultRepeat = 30000L * repeatFactor;
+
+  const size_t numberOfClients = 4;
+
+  CountDownLatch latch(numberOfClients - 1);
+
+  long repeat = defaultRepeat;
+
+  auto repeatsPerClient = repeat / numberOfClients;
+
+  vector<Owned<Destination>> destinations;
+  vector<Owned<Client>> clients;
+
+  for (size_t i = 0; i < numberOfClients; i++) {
+    Owned<Destination> destination(new Destination());
+
+    spawn(*destination);
+
+    Owned<Client> client(new Client(
+        destination->self(),
+        &latch,
+        repeatsPerClient));
+
+    spawn(*client);
+
+    destinations.push_back(destination);
+    clients.push_back(client);
+  }
+
+  Stopwatch watch;
+  watch.start();
+
+  foreach (const Owned<Client>& client, clients) {
+    post(client->self(), "run");
+  }
+
+  AWAIT_READY(latch.triggered());
+
+  Duration elapsed = watch.elapsed();
+
+  double throughput = (double) repeat / elapsed.secs();
+
+  cout << "Estimated Total: " << std::fixed << throughput << endl;
+
+  foreach (const Owned<Client>& client, clients) {
+    terminate(client->self());
+    wait(client->self());
+  }
+
+  foreach (const Owned<Destination>& destination, destinations) {
+    terminate(destination->self());
+    wait(destination->self());
+  }
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp b/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
new file mode 100644
index 0000000..94ffaa4
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
@@ -0,0 +1,70 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <thread>
+#include <vector>
+
+#include <process/count_down_latch.hpp>
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+
+using process::CountDownLatch;
+using process::Future;
+
+
+TEST(CountDownLatchTest, Triggered)
+{
+  CountDownLatch latch(5);
+
+  Future<Nothing> triggered = latch.triggered();
+
+  latch.decrement();
+
+  EXPECT_TRUE(triggered.isPending());
+
+  latch.decrement();
+
+  EXPECT_TRUE(triggered.isPending());
+
+  latch.decrement();
+
+  EXPECT_TRUE(triggered.isPending());
+
+  latch.decrement();
+
+  EXPECT_TRUE(triggered.isPending());
+
+  latch.decrement();
+
+  AWAIT_READY(triggered);
+}
+
+
+TEST(CountDownLatchTest, Threads)
+{
+  CountDownLatch latch(5);
+
+  std::vector<std::thread> threads;
+
+  for (size_t i = 0; i < 5; i++) {
+    threads.emplace_back(
+        [&]() {
+          latch.decrement();
+        });
+  }
+
+  foreach (std::thread& thread, threads) {
+    thread.join();
+  }
+
+  AWAIT_READY(latch.triggered());
+}


[2/2] mesos git commit: Finalize when the tests finish.

Posted by be...@apache.org.
Finalize when the tests finish.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/818aa53a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/818aa53a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/818aa53a

Branch: refs/heads/master
Commit: 818aa53acfae579a6aecf7016c675c7a389a9ed6
Parents: 7c90f18
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 19 19:25:42 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jul 20 09:10:12 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/tests/benchmarks.cpp | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/818aa53a/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 1ef8c4b..694a842 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -66,7 +66,10 @@ int main(int argc, char** argv)
   listeners.Append(process::ClockTestEventListener::instance());
   listeners.Append(process::FilterTestEventListener::instance());
 
-  return RUN_ALL_TESTS();
+  int result = RUN_ALL_TESTS();
+
+  process::finalize(true);
+  return result;
 }
 
 // TODO(jmlvanre): Factor out the client / server behavior so that we