You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/20 16:11:15 UTC
[1/2] mesos git commit: Added a message passing benchmark.
Repository: mesos
Updated Branches:
refs/heads/master ef6622589 -> 818aa53ac
Added a message passing benchmark.
Review: https://reviews.apache.org/r/60983
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7c90f186
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7c90f186
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7c90f186
Branch: refs/heads/master
Commit: 7c90f18603223474652722c78e46b0aa65a528e5
Parents: ef66225
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 19 13:56:28 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jul 20 09:10:01 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/Makefile.am | 1 +
.../include/process/count_down_latch.hpp | 54 +++++++++
3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
3rdparty/libprocess/src/tests/benchmarks.cpp | 116 +++++++++++++++++++
.../src/tests/count_down_latch_tests.cpp | 70 +++++++++++
6 files changed, 243 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 4031297..378a434 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -249,6 +249,7 @@ check_PROGRAMS = libprocess-tests benchmarks
libprocess_tests_SOURCES = \
src/tests/after_tests.cpp \
src/tests/collect_tests.cpp \
+ src/tests/count_down_latch_tests.cpp \
src/tests/decoder_tests.cpp \
src/tests/encoder_tests.cpp \
src/tests/future_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index b452053..637654e 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -19,6 +19,7 @@ nobase_include_HEADERS = \
process/check.hpp \
process/clock.hpp \
process/collect.hpp \
+ process/count_down_latch.hpp \
process/defer.hpp \
process/deferred.hpp \
process/delay.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/include/process/count_down_latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/count_down_latch.hpp b/3rdparty/libprocess/include/process/count_down_latch.hpp
new file mode 100644
index 0000000..0b6ac00
--- /dev/null
+++ b/3rdparty/libprocess/include/process/count_down_latch.hpp
@@ -0,0 +1,54 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_COUNT_DOWN_LATCH_HPP__
+#define __PROCESS_COUNT_DOWN_LATCH_HPP__
+
+#include <atomic>
+
+#include <process/future.hpp>
+
+namespace process {
+
+// An implementation of a count down latch that returns a Future to
+// signify when it gets triggered.
+class CountDownLatch
+{
+public:
+ CountDownLatch(size_t count = 1) : count(count) {}
+
+ void decrement()
+ {
+ size_t expected = count.load();
+ while (expected > 0) {
+ if (count.compare_exchange_strong(expected, expected - 1)) {
+ if (expected == 1) {
+ promise.set(Nothing());
+ }
+ break;
+ }
+ }
+ }
+
+ Future<Nothing> triggered()
+ {
+ return promise.future();
+ }
+
+private:
+ std::atomic<size_t> count;
+ Promise<Nothing> promise;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_COUNT_DOWN_LATCH_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index 3fc8d98..27451c2 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -21,6 +21,7 @@ set(PROCESS_TESTS_SRC
main.cpp
after_tests.cpp
collect_tests.cpp
+ count_down_latch_tests.cpp
decoder_tests.cpp
encoder_tests.cpp
future_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 8e8f426..1ef8c4b 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -24,6 +24,7 @@
#include <vector>
#include <process/collect.hpp>
+#include <process/count_down_latch.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
@@ -37,7 +38,9 @@
namespace http = process::http;
+using process::CountDownLatch;
using process::Future;
+using process::MessageEvent;
using process::Owned;
using process::Process;
using process::ProcessBase;
@@ -369,3 +372,116 @@ TEST(ProcessTest, Process_BENCHMARK_LargeNumberOfLinks)
delete process;
}
}
+
+
+class Destination : public Process<Destination>
+{
+protected:
+ virtual void visit(const MessageEvent& event)
+ {
+ if (event.message.name == "ping") {
+ send(event.message.from, "pong");
+ }
+ }
+};
+
+
+class Client : public Process<Client>
+{
+public:
+ Client(const UPID& destination, CountDownLatch* latch, long repeat)
+ : destination(destination), latch(latch), repeat(repeat) {}
+
+protected:
+ virtual void visit(const MessageEvent& event)
+ {
+ if (event.message.name == "pong") {
+ received += 1;
+ if (sent < repeat) {
+ send(destination, "ping");
+ sent += 1;
+ } else if (received >= repeat) {
+ latch->decrement();
+ }
+ } else if (event.message.name == "run") {
+ for (long l = 0; l < std::min(1000L, repeat); l++) {
+ send(destination, "ping");
+ sent += 1;
+ }
+ }
+ }
+
+private:
+ UPID destination;
+ CountDownLatch* latch;
+ long repeat;
+ long sent = 0L;
+ long received = 0L;
+};
+
+
+// See
+// https://github.com/akka/akka/blob/7ac37e7536547c57ab639ed8746c7b4e5ff2f69b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala
+// for the inspiration for this benchmark (this file was deleted in
+// this commit:
+// https://github.com/akka/akka/commit/a02e138f3bc7c21c2b2511ea19203a52d74584d5).
+//
+// This benchmark was discussed here:
+// http://letitcrash.com/post/17607272336/scalability-of-fork-join-pool
+TEST(ProcessTest, Process_BENCHMARK_ThroughputPerformance)
+{
+ long repeatFactor = 500L;
+ long defaultRepeat = 30000L * repeatFactor;
+
+ const size_t numberOfClients = 4;
+
+ CountDownLatch latch(numberOfClients - 1);
+
+ long repeat = defaultRepeat;
+
+ auto repeatsPerClient = repeat / numberOfClients;
+
+ vector<Owned<Destination>> destinations;
+ vector<Owned<Client>> clients;
+
+ for (size_t i = 0; i < numberOfClients; i++) {
+ Owned<Destination> destination(new Destination());
+
+ spawn(*destination);
+
+ Owned<Client> client(new Client(
+ destination->self(),
+ &latch,
+ repeatsPerClient));
+
+ spawn(*client);
+
+ destinations.push_back(destination);
+ clients.push_back(client);
+ }
+
+ Stopwatch watch;
+ watch.start();
+
+ foreach (const Owned<Client>& client, clients) {
+ post(client->self(), "run");
+ }
+
+ AWAIT_READY(latch.triggered());
+
+ Duration elapsed = watch.elapsed();
+
+ double throughput = (double) repeat / elapsed.secs();
+
+ cout << "Estimated Total: " << std::fixed << throughput << endl;
+
+ foreach (const Owned<Client>& client, clients) {
+ terminate(client->self());
+ wait(client->self());
+ }
+
+ foreach (const Owned<Destination>& destination, destinations) {
+ terminate(destination->self());
+ wait(destination->self());
+ }
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7c90f186/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp b/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
new file mode 100644
index 0000000..94ffaa4
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/count_down_latch_tests.cpp
@@ -0,0 +1,70 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <thread>
+#include <vector>
+
+#include <process/count_down_latch.hpp>
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+
+using process::CountDownLatch;
+using process::Future;
+
+
+TEST(CountDownLatchTest, Triggered)
+{
+ CountDownLatch latch(5);
+
+ Future<Nothing> triggered = latch.triggered();
+
+ latch.decrement();
+
+ EXPECT_TRUE(triggered.isPending());
+
+ latch.decrement();
+
+ EXPECT_TRUE(triggered.isPending());
+
+ latch.decrement();
+
+ EXPECT_TRUE(triggered.isPending());
+
+ latch.decrement();
+
+ EXPECT_TRUE(triggered.isPending());
+
+ latch.decrement();
+
+ AWAIT_READY(triggered);
+}
+
+
+TEST(CountDownLatchTest, Threads)
+{
+ CountDownLatch latch(5);
+
+ std::vector<std::thread> threads;
+
+ for (size_t i = 0; i < 5; i++) {
+ threads.emplace_back(
+ [&]() {
+ latch.decrement();
+ });
+ }
+
+ foreach (std::thread& thread, threads) {
+ thread.join();
+ }
+
+ AWAIT_READY(latch.triggered());
+}
[2/2] mesos git commit: Finalize when the tests finish.
Posted by be...@apache.org.
Finalize when the tests finish.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/818aa53a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/818aa53a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/818aa53a
Branch: refs/heads/master
Commit: 818aa53acfae579a6aecf7016c675c7a389a9ed6
Parents: 7c90f18
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 19 19:25:42 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jul 20 09:10:12 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/tests/benchmarks.cpp | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/818aa53a/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index 1ef8c4b..694a842 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -66,7 +66,10 @@ int main(int argc, char** argv)
listeners.Append(process::ClockTestEventListener::instance());
listeners.Append(process::FilterTestEventListener::instance());
- return RUN_ALL_TESTS();
+ int result = RUN_ALL_TESTS();
+
+ process::finalize(true);
+ return result;
}
// TODO(jmlvanre): Factor out the client / server behavior so that we