You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/04/18 00:47:12 UTC

mesos git commit: Cleaned up the libprocess ping/pong benchmark.

Repository: mesos
Updated Branches:
  refs/heads/master 61f08f8c0 -> c0bf80e2b


Cleaned up the libprocess ping/pong benchmark.

Review: https://reviews.apache.org/r/27113


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0bf80e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0bf80e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0bf80e2

Branch: refs/heads/master
Commit: c0bf80e2b25046ac5cf17fd824d9d69983fb6fa9
Parents: 61f08f8
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Apr 17 14:19:23 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Apr 17 15:47:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/tests/benchmarks.cpp | 398 +++++++++++-----------
 1 file changed, 192 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c0bf80e2/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index a927e4e..0d67148 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -22,25 +22,28 @@
 
 #include <iostream>
 #include <memory>
-#include <unordered_set>
+#include <string>
 #include <vector>
 
+#include <process/collect.hpp>
+#include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
+#include <process/owned.hpp>
 #include <process/process.hpp>
 
+#include <stout/duration.hpp>
+#include <stout/gtest.hpp>
+#include <stout/hashset.hpp>
 #include <stout/stopwatch.hpp>
 
 using namespace process;
 
 using std::cout;
 using std::endl;
-using std::function;
-using std::istringstream;
+using std::list;
 using std::ostringstream;
 using std::string;
-using std::unique_ptr;
-using std::unordered_set;
 using std::vector;
 
 int main(int argc, char** argv)
@@ -58,250 +61,233 @@ int main(int argc, char** argv)
   return RUN_ALL_TESTS();
 }
 
+// TODO(jmlvanre): Factor out the client / server behavior so that we
+// can make separate binaries for the client and server. This is
+// useful to attach performance tools to them separately.
 
-class BenchmarkProcess : public Process<BenchmarkProcess>
+// A process that emulates the 'client' side of a ping pong game.
+// An HTTP '/run' request performs a run and returns the time elapsed.
+class ClientProcess : public Process<ClientProcess>
 {
 public:
-  BenchmarkProcess(
-      int _iterations = 1,
-      int _maxOutstanding = 1,
-      const Option<UPID>& _other = Option<UPID>())
-    : other(_other),
-      counter(0),
-      iterations(_iterations),
-      maxOutstanding(_maxOutstanding),
-      outstanding(0),
-      sent(0)
-  {
-    if (other.isSome()) {
-      setLink(other.get());
-    }
-  }
+  ClientProcess()
+    : running(false),
+      requests(0),
+      responses(0),
+      totalRequests(0),
+      concurrency(0) {}
 
-  virtual ~BenchmarkProcess() {}
+  virtual ~ClientProcess() {}
 
+protected:
   virtual void initialize()
   {
-    install("ping", &BenchmarkProcess::ping);
-    install("pong", &BenchmarkProcess::pong);
+    install("pong", &ClientProcess::pong);
+
+    route("/run", None(), &ClientProcess::run);
   }
 
-  void setLink(const UPID& that)
+private:
+  Future<http::Response> run(const http::Request& request)
   {
-    link(that);
+    if (duration.get() != NULL) {
+      return http::BadRequest("A run is already in progress");
+    }
+
+    hashmap<string, Option<string>> parameters {
+      {"server", request.query.get("server")},
+      {"messageSize", request.query.get("messageSize")},
+      {"requests", request.query.get("requests")},
+      {"concurrency", request.query.get("concurrency")},
+    };
+
+    // Ensure all parameters were provided.
+    foreachpair (const string& parameter,
+                 const Option<string>& value,
+                 parameters) {
+      if (value.isNone()) {
+        return http::BadRequest("Missing '" + parameter + "' parameter");
+      }
+    }
+
+    server = UPID(parameters["server"].get());
+    link(server);
+
+    Try<Bytes> messageSize = Bytes::parse(parameters["messageSize"].get());
+    if (messageSize.isError()) {
+      return http::BadRequest("Invalid 'messageSize': " + messageSize.error());
+    }
+    message = string(messageSize.get().bytes(), '1');
+
+    Try<size_t> numify_ = numify<size_t>(parameters["requests"].get());
+    if (numify_.isError()) {
+      return http::BadRequest("Invalid 'requests': " + numify_.error());
+    }
+    totalRequests = numify_.get();
+
+    numify_ = numify<size_t>(parameters["concurrency"].get());
+    if (numify_.isError()) {
+      return http::BadRequest("Invalid 'concurrency': " + numify_.error());
+    }
+    concurrency = numify_.get();
+
+    if (concurrency > totalRequests) {
+      concurrency = totalRequests;
+    }
+
+    return _run()
+      .then(lambda::bind(&Self::__run, lambda::_1));
   }
 
-  void start()
+  Future<Duration> _run()
   {
+    duration = Owned<Promise<Duration> >(new Promise<Duration>());
+
     watch.start();
-    sendRemaining();
-  }
 
-  // Returns the number of rpcs performed per second.
-  int await()
-  {
-    latch.await();
-    double elapsed = watch.elapsed().secs();
-    return iterations / elapsed;
+    while (requests < concurrency) {
+      send(server, "ping", message.c_str(), message.size());
+      ++requests;
+    }
+
+    return duration->future();
   }
 
-private:
-  void ping(const UPID& from, const string& body)
+  // TODO(jmlvanre): convert to c++11 lambda.
+  static Future<http::Response> __run(const Duration& duration)
   {
-    if (linkedPorts.find(from.address.port) == linkedPorts.end()) {
-      setLink(from);
-      linkedPorts.insert(from.address.port);
-    }
-    static const string message("hi");
-    send(from, "pong", message.c_str(), message.size());
+    return http::OK(stringify(duration));
   }
 
   void pong(const UPID& from, const string& body)
   {
-    ++counter;
-    --outstanding;
-    if (counter >= iterations) {
-      latch.trigger();
-      watch.stop();
+    ++responses;
+
+    if (responses == totalRequests) {
+      duration->set(watch.elapsed());
+      duration.reset();
+    } else if (requests < totalRequests) {
+      send(server, "ping", message.c_str(), message.size());
+      ++requests;
     }
-    sendRemaining();
   }
 
-  void sendRemaining()
-  {
-    static const string message("hi");
-    for (; outstanding < maxOutstanding && sent < iterations;
-         ++outstanding, ++sent) {
-      send(other.get(), "ping", message.c_str(), message.size());
-    }
-  }
+  bool running;
 
-  Option<UPID> other;
+  // The address of the ponger (server).
+  UPID server;
 
-  Latch latch;
   Stopwatch watch;
 
-  int counter;
+  Owned<Promise<Duration>> duration;
 
-  const int iterations;
-  const int maxOutstanding;
-  int outstanding;
-  int sent;
-  unordered_set<int> linkedPorts;
-};
+  string message;
 
+  size_t requests;
+  size_t responses;
 
-typedef int pipes[2];
+  size_t totalRequests;
+  size_t concurrency;
+};
 
 
-void createPipes(pipes& _pipes)
+// A process that emulates the 'server' side of a ping pong game.
+// Note that the server links to any clients communicating to it.
+class ServerProcess : public Process<ServerProcess>
 {
-  if (pipe(_pipes) < 0) {
-    perror("Pipe failed");
-    abort();
-  }
-  Try<Nothing> cloexec = os::cloexec(_pipes[0]);
-  if (cloexec.isError()) {
-    perror("Cloexec failed on pipe");
-    abort();
+public:
+  virtual ~ServerProcess() {}
+
+protected:
+  virtual void initialize()
+  {
+    install("ping", &ServerProcess::ping);
   }
-  cloexec = os::cloexec(_pipes[1]);
-  if (cloexec.isError()) {
-    perror("Cloexec failed on pipe");
-    abort();
+
+private:
+  void ping(const UPID& from, const string& body)
+  {
+    if (!links.contains(from)) {
+      link(from);
+      links.insert(from);
+    }
+
+    send(from, "pong", body.c_str(), body.size());
   }
-}
 
+  hashset<UPID> links;
+};
+
+// TODO(bmahler): Since there is no forking here, libprocess
+// avoids going through sockets for local messages. Either fork
+// or have the ability to disable local messages in libprocess.
 
-// Launch numberOfProcesses processes, each with clients 'client'
-// Actors. Play ping pong back and forth between these actors and the
-// main 'server' actor. Each 'client' can have queueDepth ping
-// requests outstanding to the 'server' actor.
-TEST(Process, Process_BENCHMARK_Test)
+// Launches many clients against a central server and measures
+// client throughput.
+TEST(Process, Process_BENCHMARK_ClientServer)
 {
-  const int iterations = 2500;
-  const int queueDepth = 250;
-  const int clients = 8;
-  const int numberOfProcesses = 4;
-
-  vector<int> outPipes;
-  vector<int> inPipes;
-  vector<pid_t> pids;
-  for (int moreToLaunch = numberOfProcesses;
-       moreToLaunch > 0; --moreToLaunch) {
-    // fork in order to get numberOfProcesses seperate
-    // ProcessManagers. This avoids the short-circuit built into
-    // ProcessManager for processes communicating in the same manager.
-    int requestPipes[2];
-    int resultPipes[2];
-    pid_t pid = -1;
-    createPipes(requestPipes);
-    createPipes(resultPipes);
-    pid = fork();
-
-    if (pid < 0) {
-      perror("fork() failed");
-      abort();
-    } else if (pid == 0) {
-      // Child.
-
-      // Read the number of bytes about to be parsed.
-      int stringSize = 0;
-      ssize_t result = read(requestPipes[0], &stringSize, sizeof(stringSize));
-      EXPECT_EQ(result, sizeof(stringSize));
-      char buffer[stringSize + 1];
-      memset(&buffer, 0, stringSize + 1);
-
-      // Read in the upid of the 'server' actor.
-      result = read(requestPipes[0], &buffer, stringSize);
-      EXPECT_EQ(result, stringSize);
-      istringstream inStream(buffer);
-      UPID other;
-      inStream >> other;
-
-      // Launch a thread for each client that backs an actor.
-      vector<unique_ptr<BenchmarkProcess>> benchmarkProcesses;
-      for (int i = 0; i < clients; ++i) {
-        BenchmarkProcess* process = new BenchmarkProcess(
-            iterations,
-            queueDepth,
-            other);
-        benchmarkProcesses.push_back(unique_ptr<BenchmarkProcess>(process));
-        spawn(process);
-        process->start();
-      }
+  const size_t numRequests = 10000;
+  const size_t concurrency = 250;
+  const size_t numClients = 8;
+  const Bytes messageSize = Bytes(3);
+
+  ServerProcess server;
+  const UPID serverPid = spawn(&server);
+
+  // Launch the clients.
+  vector<Owned<ClientProcess>> clients;
+  for (size_t i = 0; i < numClients; i++) {
+    clients.push_back(Owned<ClientProcess>(new ClientProcess()));
+    spawn(clients.back().get());
+  }
 
-      // Compute the total rpcs per second for this process, write the
-      // computation back to the server end of the fork.
-      int totalRpcPerSecond = 0;
-      foreach (const auto& process, benchmarkProcesses) {
-        int rpcPerSecond = process->await();
-        totalRpcPerSecond += rpcPerSecond;
-        terminate(*process);
-        wait(*process);
-      }
+  // Start the ping / pongs!
+  const string query = strings::join(
+      "&",
+      "server=" + stringify(serverPid),
+      "requests=" + stringify(numRequests),
+      "concurrency=" + stringify(concurrency),
+      "messageSize=" + stringify(messageSize));
 
-      result = write(
-          resultPipes[1],
-          &totalRpcPerSecond,
-          sizeof(totalRpcPerSecond));
-      EXPECT_EQ(result, sizeof(totalRpcPerSecond));
-      close(requestPipes[0]);
-      exit(0);
-    } else {
-      // Parent.
-
-      // Keep track of the pipes to the child forks. This way the
-      // results of their rpc / sec computations can be read back and
-      // aggregated.
-      outPipes.push_back(requestPipes[1]);
-      inPipes.push_back(resultPipes[0]);
-      pids.push_back(pid);
-
-      // If this is the last child launched, then let the parent
-      // become the 'server' actor.
-      if (moreToLaunch == 1) {
-        BenchmarkProcess process(iterations, queueDepth);
-        const UPID pid = spawn(&process);
-
-        // Stringify the server pid to send to the child processes.
-        ostringstream outStream;
-        outStream << pid;
-        int stringSize = outStream.str().size();
-
-        // For each child, write the size of the stringified pid as
-        // well as the stringified pid to the pipe.
-        foreach (int fd, outPipes) {
-          ssize_t result = write(fd, &stringSize, sizeof(stringSize));
-          EXPECT_EQ(result, sizeof(stringSize));
-          result = write(fd, outStream.str().c_str(), stringSize);
-          EXPECT_EQ(result, stringSize);
-          close(fd);
-        }
-
-        // Read the resulting rpcs / second from the child processes
-        // and aggregate the results.
-        int totalRpcsPerSecond = 0;
-        foreach (int fd, inPipes) {
-          int rpcs = 0;
-          ssize_t result = read(fd, &rpcs, sizeof(rpcs));
-          EXPECT_EQ(result, sizeof(rpcs));
-          if (result != sizeof(rpcs)) {
-            abort();
-          }
-          totalRpcsPerSecond += rpcs;
-        }
-
-        // Wait for all the child forks to terminately gracefully.
-        foreach (const auto& p, pids) {
-          ::waitpid(p, NULL, 0);
-        }
-        printf("Total: [%d] rpcs / s\n", totalRpcsPerSecond);
-        terminate(process);
-        wait(process);
-      }
-    }
+  Stopwatch watch;
+  watch.start();
+
+  list<Future<http::Response>> futures;
+  foreach (const Owned<ClientProcess>& client, clients) {
+    futures.push_back(http::get(client->self(), "run", query));
+  }
+
+  Future<list<http::Response>> responses = collect(futures);
+  AWAIT_READY(responses);
+
+  Duration elapsed = watch.elapsed();
+
+  // Print the throughput of each client.
+  size_t i = 0;
+  foreach (const http::Response& response, responses.get()) {
+    ASSERT_EQ(http::statuses[200], response.status);
+
+    Try<Duration> elapsed = Duration::parse(response.body);
+    ASSERT_SOME(elapsed);
+    double throughput = numRequests / elapsed.get().secs();
+
+    cout << "Client " << i << ": " << throughput << " rpcs / sec" << endl;
+
+    i++;
+  }
+
+  double throughput = (numRequests * numClients) / elapsed.secs();
+  cout << "Estimated Total: " << throughput << " rpcs / sec" << endl;
+
+  foreach (const Owned<ClientProcess>& client, clients) {
+    terminate(*client);
+    wait(*client);
   }
+
+  terminate(server);
+  wait(server);
+
+  return;
 }