You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/11/20 17:38:48 UTC

[mesos] branch master updated: Added MasterActorResponsiveness_BENCHMARK_Test.

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 40dc508  Added MasterActorResponsiveness_BENCHMARK_Test.
40dc508 is described below

commit 40dc508d59d547e867746bc6b5b82ced849687f8
Author: Alexander Rukletsov <ru...@gmail.com>
AuthorDate: Sun Nov 18 05:09:39 2018 +0100

    Added MasterActorResponsiveness_BENCHMARK_Test.
    
    See summary.
    
    Review: https://reviews.apache.org/r/68131/
---
 src/tests/master_benchmarks.cpp | 233 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 232 insertions(+), 1 deletion(-)

diff --git a/src/tests/master_benchmarks.cpp b/src/tests/master_benchmarks.cpp
index b6d6dc7..c9ee509 100644
--- a/src/tests/master_benchmarks.cpp
+++ b/src/tests/master_benchmarks.cpp
@@ -14,6 +14,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <atomic>
+#include <limits>
+#include <memory>
 #include <string>
 #include <tuple>
 #include <vector>
@@ -21,13 +24,17 @@
 #include <mesos/resources.hpp>
 #include <mesos/version.hpp>
 
+#include <process/async.hpp>
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
+#include <process/loop.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
+#include <process/statistics.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/stopwatch.hpp>
 
 #include "common/protobuf_utils.hpp"
@@ -36,21 +43,32 @@
 
 namespace http = process::http;
 
+using process::async;
 using process::await;
+using process::collect;
+using process::Break;
 using process::Clock;
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
+using process::loop;
 using process::Owned;
 using process::PID;
 using process::ProcessBase;
 using process::Promise;
 using process::spawn;
+using process::Statistics;
 using process::terminate;
 using process::UPID;
+using process::wait;
 
+using std::atomic_bool;
 using std::cout;
 using std::endl;
 using std::make_tuple;
+using std::numeric_limits;
+using std::shared_ptr;
 using std::string;
 using std::tie;
 using std::tuple;
@@ -237,7 +255,7 @@ public:
   ~TestSlave()
   {
     terminate(process.get());
-    process::wait(process.get());
+    wait(process.get());
   }
 
   Future<Nothing> reregister()
@@ -482,6 +500,219 @@ TEST_P(MasterStateQuery_BENCHMARK_Test, GetState)
 }
 
 
+class MasterActorResponsiveness_BENCHMARK_Test
+  : public MesosTest,
+    public WithParamInterface<tuple<
+      size_t, size_t, size_t, size_t, size_t, size_t, size_t>> {};
+
+
+INSTANTIATE_TEST_CASE_P(
+    AgentFrameworkTaskCount,
+    MasterActorResponsiveness_BENCHMARK_Test,
+    ::testing::Values(
+        make_tuple(100, 10, 10, 10, 10, 50, 5),
+        make_tuple(1000, 10, 10, 10, 10, 10, 5)));
+
+
+// This test indirectly measures how the Master actor is affected by serving
+// '/state' requests. The response time for a lightweight '/health' endpoint
+// is taken as a load indicator. We set up a lot of master state from artificial
+// agents and send multiple '/state' queries while constantly probing '/health'.
+// As the baseline only '/health' is queried.
+//
+// NOTE: This test can dead lock if the number of libprocess worker threads is
+// insufficient. We observed deadlocks when
+// `(numClients >= LIBPROCESS_NUM_WORKER_THREADS - 3)`. Once MESOS-9400 is
+// fixed, we can add an assertion here.
+TEST_P(MasterActorResponsiveness_BENCHMARK_Test, WithV0StateLoad)
+{
+  size_t agentCount;
+  size_t frameworksPerAgent;
+  size_t tasksPerFramework;
+  size_t completedFrameworksPerAgent;
+  size_t tasksPerCompletedFramework;
+  size_t numRequests;
+  size_t numClients;
+
+  tie(agentCount,
+    frameworksPerAgent,
+    tasksPerFramework,
+    completedFrameworksPerAgent,
+    tasksPerCompletedFramework,
+    numRequests,
+    numClients) = GetParam();
+
+  const string indicatorEndpoint = "health";
+  const string stateEndpoint = "state";
+
+  // Disable authentication to avoid the overhead, since we don't care about
+  // it in this test.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.authenticate_agents = false;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  vector<Owned<TestSlave>> slaves;
+
+  for (size_t i = 0; i < agentCount; i++) {
+    SlaveID slaveId;
+    slaveId.set_value("agent" + stringify(i));
+
+    slaves.push_back(Owned<TestSlave>(new TestSlave(
+        master.get()->pid,
+        slaveId,
+        frameworksPerAgent,
+        tasksPerFramework,
+        completedFrameworksPerAgent,
+        tasksPerCompletedFramework)));
+  }
+
+  cout << "Test setup: " << agentCount << " agents with a total of "
+       << frameworksPerAgent * tasksPerFramework * agentCount
+       << " running tasks and "
+       << completedFrameworksPerAgent * tasksPerCompletedFramework * agentCount
+       << " completed tasks" << endl;
+
+  vector<Future<Nothing>> reregistered;
+
+  foreach (const Owned<TestSlave>& slave, slaves) {
+    reregistered.push_back(slave->reregister());
+  }
+
+  // Wait all agents to finish reregistration.
+  await(reregistered).await();
+
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  // A helper sending a single request and measuring the time it takes to
+  // receive a response.
+  auto singleRequest = [master](const string& endpoint) -> Future<Duration> {
+    shared_ptr<Stopwatch> watch(new Stopwatch);
+    watch->start();
+
+    Future<http::Response> response = http::get(
+        master.get()->pid,
+        endpoint,
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    return response.then([watch](const http::Response& r) -> Future<Duration> {
+      watch->stop();
+      EXPECT_EQ(r.status, http::OK().status);
+      return watch->elapsed();
+    });
+  };
+
+  // Synchronizes completion of all lambdas sending requests.
+  atomic_bool stop = { false };
+
+  // A helper sending `numRequests` requests to `endpoint`. An early exit
+  // is possible if `stop` is set. Note that this lambda sets `stop` once
+  // `numRequests` requests have been sent. The intention is to synchronize
+  // completion across all running lambdas.
+  auto repeatRequests = [singleRequest, &stop](
+      const string& endpoint, size_t numRequests) -> vector<Duration> {
+    vector<Duration> durations;
+
+    size_t remaining = numRequests;
+    auto f = loop(
+        None(),
+        [=]() {
+          return singleRequest(endpoint);
+        },
+        [&remaining, &durations, &stop](
+            const Duration& d) -> ControlFlow<Nothing> {
+          durations.push_back(d);
+
+          if (--remaining <= 0) {
+            stop.store(true);
+          }
+
+          if (stop.load()) {
+            return Break();
+          } else {
+            return Continue();
+          }
+        });
+
+    f.await();
+    EXPECT_TRUE(f.isReady());
+
+    return durations;
+  };
+
+  auto printStats = [](const vector<Duration>& durations) {
+    Option<Statistics<Duration>> s =
+      Statistics<Duration>::from(durations.cbegin(), durations.cend());
+    EXPECT_SOME(s);
+
+    cout << "[" << s->min << ", " << s->p25 << ", " << s->p50 << ", "
+         << s->p75 << ", " << s->p90 << ", " << s->max << "]"
+         << " from " << s->count << " measurements" << endl;
+  };
+
+  // First measure the average response time for the `indicatorEndpoint` only
+  // as the baseline.
+  cout << "Baseline: launching " << numRequests
+       << " '/" << indicatorEndpoint<< "'" << " requests" << endl;
+
+  Future<vector<Duration>> indicatorFinished = async(
+      repeatRequests, indicatorEndpoint, numRequests);
+  indicatorFinished.await();
+  CHECK_READY(indicatorFinished);
+
+  cout << "Results [min, p25, p50, p75, p90, max]: " << endl
+       << "  '/" << indicatorEndpoint << "' -> ";
+  printStats(indicatorFinished.get());
+
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  // Now measure the average response times when request for both
+  // `indicatorEndpoint` and `stateEndpoint` are sent in parallel.
+  // Stop when `numRequests` to `stateEndpoint` have been sent.
+  stop.store(false);
+
+  cout << "Benchmark: launching "
+       << numRequests << " '/" << indicatorEndpoint << "'"
+       << " requests with up to " << numClients << " * " << numRequests
+       << " '/" << stateEndpoint << "'" << " requests in background" << endl;
+
+  vector<Future<vector<Duration>>> stateFinished;
+  while (numClients-- > 0) {
+    stateFinished.push_back(async(
+        repeatRequests, stateEndpoint, numeric_limits<size_t>::max()));
+  }
+
+  indicatorFinished = async(
+      repeatRequests, indicatorEndpoint, numRequests);
+
+  Future<vector<vector<Duration>>> collected = collect(stateFinished);
+  collected.await();
+  CHECK_READY(collected);
+
+  indicatorFinished.await();
+  CHECK_READY(indicatorFinished);
+
+  // Aggregate response times for all `/state` clients.
+  vector<Duration> aggregatedState;
+  foreach (const vector<Duration>& v, collected.get()) {
+    aggregatedState.insert(aggregatedState.end(), v.cbegin(), v.cend());
+  }
+
+  cout << "Results [min, p25, p50, p75, p90, max]: " << endl
+       << "  '/" << indicatorEndpoint << "' -> ";
+  printStats(indicatorFinished.get());
+
+  cout << "  '/" << stateEndpoint << "' -> ";
+  printStats(aggregatedState);
+}
+
+
 class MasterMetricsQuery_BENCHMARK_Test
   : public MesosTest,
     public WithParamInterface<tuple<