You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/11/20 17:38:48 UTC
[mesos] branch master updated: Added
MasterActorResponsiveness_BENCHMARK_Test.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 40dc508 Added MasterActorResponsiveness_BENCHMARK_Test.
40dc508 is described below
commit 40dc508d59d547e867746bc6b5b82ced849687f8
Author: Alexander Rukletsov <ru...@gmail.com>
AuthorDate: Sun Nov 18 05:09:39 2018 +0100
Added MasterActorResponsiveness_BENCHMARK_Test.
See summary.
Review: https://reviews.apache.org/r/68131/
---
src/tests/master_benchmarks.cpp | 233 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 232 insertions(+), 1 deletion(-)
diff --git a/src/tests/master_benchmarks.cpp b/src/tests/master_benchmarks.cpp
index b6d6dc7..c9ee509 100644
--- a/src/tests/master_benchmarks.cpp
+++ b/src/tests/master_benchmarks.cpp
@@ -14,6 +14,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <atomic>
+#include <limits>
+#include <memory>
#include <string>
#include <tuple>
#include <vector>
@@ -21,13 +24,17 @@
#include <mesos/resources.hpp>
#include <mesos/version.hpp>
+#include <process/async.hpp>
#include <process/clock.hpp>
#include <process/collect.hpp>
#include <process/future.hpp>
+#include <process/loop.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <process/statistics.hpp>
+#include <stout/duration.hpp>
#include <stout/stopwatch.hpp>
#include "common/protobuf_utils.hpp"
@@ -36,21 +43,32 @@
namespace http = process::http;
+using process::async;
using process::await;
+using process::collect;
+using process::Break;
using process::Clock;
+using process::Continue;
+using process::ControlFlow;
using process::Failure;
using process::Future;
+using process::loop;
using process::Owned;
using process::PID;
using process::ProcessBase;
using process::Promise;
using process::spawn;
+using process::Statistics;
using process::terminate;
using process::UPID;
+using process::wait;
+using std::atomic_bool;
using std::cout;
using std::endl;
using std::make_tuple;
+using std::numeric_limits;
+using std::shared_ptr;
using std::string;
using std::tie;
using std::tuple;
@@ -237,7 +255,7 @@ public:
~TestSlave()
{
terminate(process.get());
- process::wait(process.get());
+ wait(process.get());
}
Future<Nothing> reregister()
@@ -482,6 +500,219 @@ TEST_P(MasterStateQuery_BENCHMARK_Test, GetState)
}
+class MasterActorResponsiveness_BENCHMARK_Test
+ : public MesosTest,
+ public WithParamInterface<tuple<
+ size_t, size_t, size_t, size_t, size_t, size_t, size_t>> {};
+
+
+INSTANTIATE_TEST_CASE_P(
+ AgentFrameworkTaskCount,
+ MasterActorResponsiveness_BENCHMARK_Test,
+ ::testing::Values(
+ make_tuple(100, 10, 10, 10, 10, 50, 5),
+ make_tuple(1000, 10, 10, 10, 10, 10, 5)));
+
+
+// This test indirectly measures how the Master actor is affected by serving
+// '/state' requests. The response time for a lightweight '/health' endpoint
+// is taken as a load indicator. We set up a lot of master state from artificial
+// agents and send multiple '/state' queries while constantly probing '/health'.
+// As the baseline only '/health' is queried.
+//
+// NOTE: This test can dead lock if the number of libprocess worker threads is
+// insufficient. We observed deadlocks when
+// `(numClients >= LIBPROCESS_NUM_WORKER_THREADS - 3)`. Once MESOS-9400 is
+// fixed, we can add an assertion here.
+TEST_P(MasterActorResponsiveness_BENCHMARK_Test, WithV0StateLoad)
+{
+ size_t agentCount;
+ size_t frameworksPerAgent;
+ size_t tasksPerFramework;
+ size_t completedFrameworksPerAgent;
+ size_t tasksPerCompletedFramework;
+ size_t numRequests;
+ size_t numClients;
+
+ tie(agentCount,
+ frameworksPerAgent,
+ tasksPerFramework,
+ completedFrameworksPerAgent,
+ tasksPerCompletedFramework,
+ numRequests,
+ numClients) = GetParam();
+
+ const string indicatorEndpoint = "health";
+ const string stateEndpoint = "state";
+
+ // Disable authentication to avoid the overhead, since we don't care about
+ // it in this test.
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.authenticate_agents = false;
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ vector<Owned<TestSlave>> slaves;
+
+ for (size_t i = 0; i < agentCount; i++) {
+ SlaveID slaveId;
+ slaveId.set_value("agent" + stringify(i));
+
+ slaves.push_back(Owned<TestSlave>(new TestSlave(
+ master.get()->pid,
+ slaveId,
+ frameworksPerAgent,
+ tasksPerFramework,
+ completedFrameworksPerAgent,
+ tasksPerCompletedFramework)));
+ }
+
+ cout << "Test setup: " << agentCount << " agents with a total of "
+ << frameworksPerAgent * tasksPerFramework * agentCount
+ << " running tasks and "
+ << completedFrameworksPerAgent * tasksPerCompletedFramework * agentCount
+ << " completed tasks" << endl;
+
+ vector<Future<Nothing>> reregistered;
+
+ foreach (const Owned<TestSlave>& slave, slaves) {
+ reregistered.push_back(slave->reregister());
+ }
+
+ // Wait all agents to finish reregistration.
+ await(reregistered).await();
+
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // A helper sending a single request and measuring the time it takes to
+ // receive a response.
+ auto singleRequest = [master](const string& endpoint) -> Future<Duration> {
+ shared_ptr<Stopwatch> watch(new Stopwatch);
+ watch->start();
+
+ Future<http::Response> response = http::get(
+ master.get()->pid,
+ endpoint,
+ None(),
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+ return response.then([watch](const http::Response& r) -> Future<Duration> {
+ watch->stop();
+ EXPECT_EQ(r.status, http::OK().status);
+ return watch->elapsed();
+ });
+ };
+
+ // Synchronizes completion of all lambdas sending requests.
+ atomic_bool stop = { false };
+
+ // A helper sending `numRequests` requests to `endpoint`. An early exit
+ // is possible if `stop` is set. Note that this lambda sets `stop` once
+ // `numRequests` requests have been sent. The intention is to synchronize
+ // completion across all running lambdas.
+ auto repeatRequests = [singleRequest, &stop](
+ const string& endpoint, size_t numRequests) -> vector<Duration> {
+ vector<Duration> durations;
+
+ size_t remaining = numRequests;
+ auto f = loop(
+ None(),
+ [=]() {
+ return singleRequest(endpoint);
+ },
+ [&remaining, &durations, &stop](
+ const Duration& d) -> ControlFlow<Nothing> {
+ durations.push_back(d);
+
+ if (--remaining <= 0) {
+ stop.store(true);
+ }
+
+ if (stop.load()) {
+ return Break();
+ } else {
+ return Continue();
+ }
+ });
+
+ f.await();
+ EXPECT_TRUE(f.isReady());
+
+ return durations;
+ };
+
+ auto printStats = [](const vector<Duration>& durations) {
+ Option<Statistics<Duration>> s =
+ Statistics<Duration>::from(durations.cbegin(), durations.cend());
+ EXPECT_SOME(s);
+
+ cout << "[" << s->min << ", " << s->p25 << ", " << s->p50 << ", "
+ << s->p75 << ", " << s->p90 << ", " << s->max << "]"
+ << " from " << s->count << " measurements" << endl;
+ };
+
+ // First measure the average response time for the `indicatorEndpoint` only
+ // as the baseline.
+ cout << "Baseline: launching " << numRequests
+ << " '/" << indicatorEndpoint<< "'" << " requests" << endl;
+
+ Future<vector<Duration>> indicatorFinished = async(
+ repeatRequests, indicatorEndpoint, numRequests);
+ indicatorFinished.await();
+ CHECK_READY(indicatorFinished);
+
+ cout << "Results [min, p25, p50, p75, p90, max]: " << endl
+ << " '/" << indicatorEndpoint << "' -> ";
+ printStats(indicatorFinished.get());
+
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // Now measure the average response times when request for both
+ // `indicatorEndpoint` and `stateEndpoint` are sent in parallel.
+ // Stop when `numRequests` to `stateEndpoint` have been sent.
+ stop.store(false);
+
+ cout << "Benchmark: launching "
+ << numRequests << " '/" << indicatorEndpoint << "'"
+ << " requests with up to " << numClients << " * " << numRequests
+ << " '/" << stateEndpoint << "'" << " requests in background" << endl;
+
+ vector<Future<vector<Duration>>> stateFinished;
+ while (numClients-- > 0) {
+ stateFinished.push_back(async(
+ repeatRequests, stateEndpoint, numeric_limits<size_t>::max()));
+ }
+
+ indicatorFinished = async(
+ repeatRequests, indicatorEndpoint, numRequests);
+
+ Future<vector<vector<Duration>>> collected = collect(stateFinished);
+ collected.await();
+ CHECK_READY(collected);
+
+ indicatorFinished.await();
+ CHECK_READY(indicatorFinished);
+
+ // Aggregate response times for all `/state` clients.
+ vector<Duration> aggregatedState;
+ foreach (const vector<Duration>& v, collected.get()) {
+ aggregatedState.insert(aggregatedState.end(), v.cbegin(), v.cend());
+ }
+
+ cout << "Results [min, p25, p50, p75, p90, max]: " << endl
+ << " '/" << indicatorEndpoint << "' -> ";
+ printStats(indicatorFinished.get());
+
+ cout << " '/" << stateEndpoint << "' -> ";
+ printStats(aggregatedState);
+}
+
+
class MasterMetricsQuery_BENCHMARK_Test
: public MesosTest,
public WithParamInterface<tuple<