You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/04/18 00:47:12 UTC
mesos git commit: Cleaned up the libprocess ping/pong benchmark.
Repository: mesos
Updated Branches:
refs/heads/master 61f08f8c0 -> c0bf80e2b
Cleaned up the libprocess ping/pong benchmark.
Review: https://reviews.apache.org/r/27113
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0bf80e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0bf80e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0bf80e2
Branch: refs/heads/master
Commit: c0bf80e2b25046ac5cf17fd824d9d69983fb6fa9
Parents: 61f08f8
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Apr 17 14:19:23 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Apr 17 15:47:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/tests/benchmarks.cpp | 398 +++++++++++-----------
1 file changed, 192 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0bf80e2/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index a927e4e..0d67148 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -22,25 +22,28 @@
#include <iostream>
#include <memory>
-#include <unordered_set>
+#include <string>
#include <vector>
+#include <process/collect.hpp>
+#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/owned.hpp>
#include <process/process.hpp>
+#include <stout/duration.hpp>
+#include <stout/gtest.hpp>
+#include <stout/hashset.hpp>
#include <stout/stopwatch.hpp>
using namespace process;
using std::cout;
using std::endl;
-using std::function;
-using std::istringstream;
+using std::list;
using std::ostringstream;
using std::string;
-using std::unique_ptr;
-using std::unordered_set;
using std::vector;
int main(int argc, char** argv)
@@ -58,250 +61,233 @@ int main(int argc, char** argv)
return RUN_ALL_TESTS();
}
+// TODO(jmlvanre): Factor out the client / server behavior so that we
+// can make separate binaries for the client and server. This is
+// useful to attach performance tools to them separately.
-class BenchmarkProcess : public Process<BenchmarkProcess>
+// A process that emulates the 'client' side of a ping pong game.
+// An HTTP '/run' request performs a run and returns the time elapsed.
+class ClientProcess : public Process<ClientProcess>
{
public:
- BenchmarkProcess(
- int _iterations = 1,
- int _maxOutstanding = 1,
- const Option<UPID>& _other = Option<UPID>())
- : other(_other),
- counter(0),
- iterations(_iterations),
- maxOutstanding(_maxOutstanding),
- outstanding(0),
- sent(0)
- {
- if (other.isSome()) {
- setLink(other.get());
- }
- }
+ ClientProcess()
+ : running(false),
+ requests(0),
+ responses(0),
+ totalRequests(0),
+ concurrency(0) {}
- virtual ~BenchmarkProcess() {}
+ virtual ~ClientProcess() {}
+protected:
virtual void initialize()
{
- install("ping", &BenchmarkProcess::ping);
- install("pong", &BenchmarkProcess::pong);
+ install("pong", &ClientProcess::pong);
+
+ route("/run", None(), &ClientProcess::run);
}
- void setLink(const UPID& that)
+private:
+ Future<http::Response> run(const http::Request& request)
{
- link(that);
+ if (duration.get() != NULL) {
+ return http::BadRequest("A run is already in progress");
+ }
+
+ hashmap<string, Option<string>> parameters {
+ {"server", request.query.get("server")},
+ {"messageSize", request.query.get("messageSize")},
+ {"requests", request.query.get("requests")},
+ {"concurrency", request.query.get("concurrency")},
+ };
+
+ // Ensure all parameters were provided.
+ foreachpair (const string& parameter,
+ const Option<string>& value,
+ parameters) {
+ if (value.isNone()) {
+ return http::BadRequest("Missing '" + parameter + "' parameter");
+ }
+ }
+
+ server = UPID(parameters["server"].get());
+ link(server);
+
+ Try<Bytes> messageSize = Bytes::parse(parameters["messageSize"].get());
+ if (messageSize.isError()) {
+ return http::BadRequest("Invalid 'messageSize': " + messageSize.error());
+ }
+ message = string(messageSize.get().bytes(), '1');
+
+ Try<size_t> numify_ = numify<size_t>(parameters["requests"].get());
+ if (numify_.isError()) {
+ return http::BadRequest("Invalid 'requests': " + numify_.error());
+ }
+ totalRequests = numify_.get();
+
+ numify_ = numify<size_t>(parameters["concurrency"].get());
+ if (numify_.isError()) {
+ return http::BadRequest("Invalid 'concurrency': " + numify_.error());
+ }
+ concurrency = numify_.get();
+
+ if (concurrency > totalRequests) {
+ concurrency = totalRequests;
+ }
+
+ return _run()
+ .then(lambda::bind(&Self::__run, lambda::_1));
}
- void start()
+ Future<Duration> _run()
{
+ duration = Owned<Promise<Duration> >(new Promise<Duration>());
+
watch.start();
- sendRemaining();
- }
- // Returns the number of rpcs performed per second.
- int await()
- {
- latch.await();
- double elapsed = watch.elapsed().secs();
- return iterations / elapsed;
+ while (requests < concurrency) {
+ send(server, "ping", message.c_str(), message.size());
+ ++requests;
+ }
+
+ return duration->future();
}
-private:
- void ping(const UPID& from, const string& body)
+ // TODO(jmlvanre): convert to c++11 lambda.
+ static Future<http::Response> __run(const Duration& duration)
{
- if (linkedPorts.find(from.address.port) == linkedPorts.end()) {
- setLink(from);
- linkedPorts.insert(from.address.port);
- }
- static const string message("hi");
- send(from, "pong", message.c_str(), message.size());
+ return http::OK(stringify(duration));
}
void pong(const UPID& from, const string& body)
{
- ++counter;
- --outstanding;
- if (counter >= iterations) {
- latch.trigger();
- watch.stop();
+ ++responses;
+
+ if (responses == totalRequests) {
+ duration->set(watch.elapsed());
+ duration.reset();
+ } else if (requests < totalRequests) {
+ send(server, "ping", message.c_str(), message.size());
+ ++requests;
}
- sendRemaining();
}
- void sendRemaining()
- {
- static const string message("hi");
- for (; outstanding < maxOutstanding && sent < iterations;
- ++outstanding, ++sent) {
- send(other.get(), "ping", message.c_str(), message.size());
- }
- }
+ bool running;
- Option<UPID> other;
+ // The address of the ponger (server).
+ UPID server;
- Latch latch;
Stopwatch watch;
- int counter;
+ Owned<Promise<Duration>> duration;
- const int iterations;
- const int maxOutstanding;
- int outstanding;
- int sent;
- unordered_set<int> linkedPorts;
-};
+ string message;
+ size_t requests;
+ size_t responses;
-typedef int pipes[2];
+ size_t totalRequests;
+ size_t concurrency;
+};
-void createPipes(pipes& _pipes)
+// A process that emulates the 'server' side of a ping pong game.
+// Note that the server links to any clients communicating to it.
+class ServerProcess : public Process<ServerProcess>
{
- if (pipe(_pipes) < 0) {
- perror("Pipe failed");
- abort();
- }
- Try<Nothing> cloexec = os::cloexec(_pipes[0]);
- if (cloexec.isError()) {
- perror("Cloexec failed on pipe");
- abort();
+public:
+ virtual ~ServerProcess() {}
+
+protected:
+ virtual void initialize()
+ {
+ install("ping", &ServerProcess::ping);
}
- cloexec = os::cloexec(_pipes[1]);
- if (cloexec.isError()) {
- perror("Cloexec failed on pipe");
- abort();
+
+private:
+ void ping(const UPID& from, const string& body)
+ {
+ if (!links.contains(from)) {
+ link(from);
+ links.insert(from);
+ }
+
+ send(from, "pong", body.c_str(), body.size());
}
-}
+ hashset<UPID> links;
+};
+
+// TODO(bmahler): Since there is no forking here, libprocess
+// avoids going through sockets for local messages. Either fork
+// or have the ability to disable local messages in libprocess.
-// Launch numberOfProcesses processes, each with clients 'client'
-// Actors. Play ping pong back and forth between these actors and the
-// main 'server' actor. Each 'client' can have queueDepth ping
-// requests outstanding to the 'server' actor.
-TEST(Process, Process_BENCHMARK_Test)
+// Launches many clients against a central server and measures
+// client throughput.
+TEST(Process, Process_BENCHMARK_ClientServer)
{
- const int iterations = 2500;
- const int queueDepth = 250;
- const int clients = 8;
- const int numberOfProcesses = 4;
-
- vector<int> outPipes;
- vector<int> inPipes;
- vector<pid_t> pids;
- for (int moreToLaunch = numberOfProcesses;
- moreToLaunch > 0; --moreToLaunch) {
- // fork in order to get numberOfProcesses seperate
- // ProcessManagers. This avoids the short-circuit built into
- // ProcessManager for processes communicating in the same manager.
- int requestPipes[2];
- int resultPipes[2];
- pid_t pid = -1;
- createPipes(requestPipes);
- createPipes(resultPipes);
- pid = fork();
-
- if (pid < 0) {
- perror("fork() failed");
- abort();
- } else if (pid == 0) {
- // Child.
-
- // Read the number of bytes about to be parsed.
- int stringSize = 0;
- ssize_t result = read(requestPipes[0], &stringSize, sizeof(stringSize));
- EXPECT_EQ(result, sizeof(stringSize));
- char buffer[stringSize + 1];
- memset(&buffer, 0, stringSize + 1);
-
- // Read in the upid of the 'server' actor.
- result = read(requestPipes[0], &buffer, stringSize);
- EXPECT_EQ(result, stringSize);
- istringstream inStream(buffer);
- UPID other;
- inStream >> other;
-
- // Launch a thread for each client that backs an actor.
- vector<unique_ptr<BenchmarkProcess>> benchmarkProcesses;
- for (int i = 0; i < clients; ++i) {
- BenchmarkProcess* process = new BenchmarkProcess(
- iterations,
- queueDepth,
- other);
- benchmarkProcesses.push_back(unique_ptr<BenchmarkProcess>(process));
- spawn(process);
- process->start();
- }
+ const size_t numRequests = 10000;
+ const size_t concurrency = 250;
+ const size_t numClients = 8;
+ const Bytes messageSize = Bytes(3);
+
+ ServerProcess server;
+ const UPID serverPid = spawn(&server);
+
+ // Launch the clients.
+ vector<Owned<ClientProcess>> clients;
+ for (size_t i = 0; i < numClients; i++) {
+ clients.push_back(Owned<ClientProcess>(new ClientProcess()));
+ spawn(clients.back().get());
+ }
- // Compute the total rpcs per second for this process, write the
- // computation back to the server end of the fork.
- int totalRpcPerSecond = 0;
- foreach (const auto& process, benchmarkProcesses) {
- int rpcPerSecond = process->await();
- totalRpcPerSecond += rpcPerSecond;
- terminate(*process);
- wait(*process);
- }
+ // Start the ping / pongs!
+ const string query = strings::join(
+ "&",
+ "server=" + stringify(serverPid),
+ "requests=" + stringify(numRequests),
+ "concurrency=" + stringify(concurrency),
+ "messageSize=" + stringify(messageSize));
- result = write(
- resultPipes[1],
- &totalRpcPerSecond,
- sizeof(totalRpcPerSecond));
- EXPECT_EQ(result, sizeof(totalRpcPerSecond));
- close(requestPipes[0]);
- exit(0);
- } else {
- // Parent.
-
- // Keep track of the pipes to the child forks. This way the
- // results of their rpc / sec computations can be read back and
- // aggregated.
- outPipes.push_back(requestPipes[1]);
- inPipes.push_back(resultPipes[0]);
- pids.push_back(pid);
-
- // If this is the last child launched, then let the parent
- // become the 'server' actor.
- if (moreToLaunch == 1) {
- BenchmarkProcess process(iterations, queueDepth);
- const UPID pid = spawn(&process);
-
- // Stringify the server pid to send to the child processes.
- ostringstream outStream;
- outStream << pid;
- int stringSize = outStream.str().size();
-
- // For each child, write the size of the stringified pid as
- // well as the stringified pid to the pipe.
- foreach (int fd, outPipes) {
- ssize_t result = write(fd, &stringSize, sizeof(stringSize));
- EXPECT_EQ(result, sizeof(stringSize));
- result = write(fd, outStream.str().c_str(), stringSize);
- EXPECT_EQ(result, stringSize);
- close(fd);
- }
-
- // Read the resulting rpcs / second from the child processes
- // and aggregate the results.
- int totalRpcsPerSecond = 0;
- foreach (int fd, inPipes) {
- int rpcs = 0;
- ssize_t result = read(fd, &rpcs, sizeof(rpcs));
- EXPECT_EQ(result, sizeof(rpcs));
- if (result != sizeof(rpcs)) {
- abort();
- }
- totalRpcsPerSecond += rpcs;
- }
-
- // Wait for all the child forks to terminately gracefully.
- foreach (const auto& p, pids) {
- ::waitpid(p, NULL, 0);
- }
- printf("Total: [%d] rpcs / s\n", totalRpcsPerSecond);
- terminate(process);
- wait(process);
- }
- }
+ Stopwatch watch;
+ watch.start();
+
+ list<Future<http::Response>> futures;
+ foreach (const Owned<ClientProcess>& client, clients) {
+ futures.push_back(http::get(client->self(), "run", query));
+ }
+
+ Future<list<http::Response>> responses = collect(futures);
+ AWAIT_READY(responses);
+
+ Duration elapsed = watch.elapsed();
+
+ // Print the throughput of each client.
+ size_t i = 0;
+ foreach (const http::Response& response, responses.get()) {
+ ASSERT_EQ(http::statuses[200], response.status);
+
+ Try<Duration> elapsed = Duration::parse(response.body);
+ ASSERT_SOME(elapsed);
+ double throughput = numRequests / elapsed.get().secs();
+
+ cout << "Client " << i << ": " << throughput << " rpcs / sec" << endl;
+
+ i++;
+ }
+
+ double throughput = (numRequests * numClients) / elapsed.secs();
+ cout << "Estimated Total: " << throughput << " rpcs / sec" << endl;
+
+ foreach (const Owned<ClientProcess>& client, clients) {
+ terminate(*client);
+ wait(*client);
}
+
+ terminate(server);
+ wait(server);
+
+ return;
}