You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/11/03 18:12:10 UTC

mesos git commit: Added a benchmark for agent reregistration during master failover.

Repository: mesos
Updated Branches:
  refs/heads/master de19f7381 -> ac0fa2814


Added a benchmark for agent reregistration during master failover.

Review: https://reviews.apache.org/r/63174


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac0fa281
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac0fa281
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac0fa281

Branch: refs/heads/master
Commit: ac0fa281472c2ba891f7bd0837fbd728ace73039
Parents: de19f73
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Wed Oct 18 01:53:11 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Nov 3 11:09:55 2017 -0700

----------------------------------------------------------------------
 src/Makefile.am                 |   1 +
 src/tests/CMakeLists.txt        |   1 +
 src/tests/master_benchmarks.cpp | 342 +++++++++++++++++++++++++++++++++++
 3 files changed, 344 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac0fa281/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1c97b1f..955f01a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2424,6 +2424,7 @@ mesos_tests_SOURCES =						\
   tests/main.cpp						\
   tests/master_allocator_tests.cpp				\
   tests/master_authorization_tests.cpp				\
+  tests/master_benchmarks.cpp					\
   tests/master_contender_detector_tests.cpp			\
   tests/master_maintenance_tests.cpp				\
   tests/master_quota_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac0fa281/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 386e047..81c85d9 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -157,6 +157,7 @@ if (NOT WIN32)
     logging_tests.cpp
     master_allocator_tests.cpp
     master_authorization_tests.cpp
+    master_benchmarks.cpp
     master_contender_detector_tests.cpp
     master_quota_tests.cpp
     master_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac0fa281/src/tests/master_benchmarks.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_benchmarks.cpp b/src/tests/master_benchmarks.cpp
new file mode 100644
index 0000000..6c3a8e5
--- /dev/null
+++ b/src/tests/master_benchmarks.cpp
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <list>
+#include <string>
+#include <tuple>
+
+#include <mesos/resources.hpp>
+#include <mesos/version.hpp>
+
+#include <process/clock.hpp>
+#include <process/collect.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/stopwatch.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "tests/mesos.hpp"
+
+using process::await;
+using process::Clock;
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::PID;
+using process::ProcessBase;
+using process::Promise;
+using process::spawn;
+using process::terminate;
+using process::UPID;
+
+using std::cout;
+using std::endl;
+using std::list;
+using std::make_tuple;
+using std::string;
+using std::tie;
+using std::tuple;
+
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+static SlaveInfo createSlaveInfo(const SlaveID& slaveId)
+{
+  // Using a static local variable to avoid the cost of re-parsing.
+  static const Resources resources =
+    Resources::parse("cpus:20;mem:10240").get();
+
+  SlaveInfo slaveInfo;
+  *(slaveInfo.mutable_resources()) = resources;
+  *(slaveInfo.mutable_id()) = slaveId;
+  *(slaveInfo.mutable_hostname()) = slaveId.value(); // Simulate the hostname.
+
+  return slaveInfo;
+}
+
+
+static FrameworkInfo createFrameworkInfo(const FrameworkID& frameworkId)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  *(frameworkInfo.mutable_id()) = frameworkId;
+
+  return frameworkInfo;
+}
+
+
+static TaskInfo createTaskInfo(const SlaveID& slaveId)
+{
+  // Using a static local variable to avoid the cost of re-parsing.
+  static const Resources resources = Resources::parse("cpus:0.1;mem:5").get();
+
+  TaskInfo taskInfo = createTask(
+      slaveId,
+      resources,
+      "dummy command");
+
+  Labels* labels = taskInfo.mutable_labels();
+
+  for (size_t i = 0; i < 10; i++) {
+    const string index = stringify(i);
+    *(labels->add_labels()) =
+      protobuf::createLabel("key" + index, "value" + index);
+  }
+
+  return taskInfo;
+}
+
+
+// A fake agent currently just for testing reregisterations.
+class TestSlaveProcess : public ProtobufProcess<TestSlaveProcess>
+{
+public:
+  TestSlaveProcess(
+      const UPID& _masterPid,
+      const SlaveID& _slaveId,
+      size_t _frameworksPerAgent,
+      size_t _tasksPerFramework,
+      size_t _completedFrameworksPerAgent,
+      size_t _tasksPerCompletedFramework)
+    : ProcessBase(process::ID::generate("test-slave")),
+      masterPid(_masterPid),
+      slaveId(_slaveId),
+      frameworksPerAgent(_frameworksPerAgent),
+      tasksPerFramework(_tasksPerFramework),
+      completedFrameworksPerAgent(_completedFrameworksPerAgent),
+      tasksPerCompletedFramework(_tasksPerCompletedFramework) {}
+
+  void initialize() override
+  {
+    install<SlaveReregisteredMessage>(&Self::reregistered);
+    install<PingSlaveMessage>(
+        &Self::ping,
+        &PingSlaveMessage::connected);
+
+    // Prepare `ReregisterSlaveMessage` which simulates the real world scenario:
+    // TODO(xujyan): Notable things missing include:
+    // - `ExecutorInfo`s
+    // - Task statuses
+    SlaveInfo slaveInfo = createSlaveInfo(slaveId);
+    message.mutable_slave()->Swap(&slaveInfo);
+    message.set_version(MESOS_VERSION);
+
+    // Used for generating framework IDs.
+    size_t id = 0;
+    for (; id < frameworksPerAgent; id++) {
+      FrameworkID frameworkId;
+      frameworkId.set_value("framework" + stringify(id));
+
+      FrameworkInfo framework = createFrameworkInfo(frameworkId);
+      message.add_frameworks()->Swap(&framework);
+
+      for (size_t j = 0; j < tasksPerFramework; j++) {
+        Task task = protobuf::createTask(
+            createTaskInfo(slaveId),
+            TASK_RUNNING,
+            frameworkId);
+        message.add_tasks()->Swap(&task);
+      }
+    }
+
+    for (; id < completedFrameworksPerAgent; id++) {
+      Archive::Framework* completedFramework =
+        message.add_completed_frameworks();
+
+      FrameworkID frameworkId;
+      frameworkId.set_value("framework" + stringify(id));
+
+      FrameworkInfo framework = createFrameworkInfo(frameworkId);
+      completedFramework->mutable_framework_info()->Swap(&framework);
+
+      for (size_t j = 0; j < tasksPerCompletedFramework; j++) {
+        Task task = protobuf::createTask(
+            createTaskInfo(slaveId),
+            TASK_FINISHED,
+            frameworkId);
+        completedFramework->add_tasks()->Swap(&task);
+      }
+    }
+  }
+
+  Future<Nothing> reregister()
+  {
+    send(masterPid, message);
+    return promise.future();
+  }
+
+  TestSlaveProcess(const TestSlaveProcess& other) = delete;
+  TestSlaveProcess& operator=(const TestSlaveProcess& other) = delete;
+
+private:
+  void reregistered(const SlaveReregisteredMessage&)
+  {
+    promise.set(Nothing());
+  }
+
+  // We need to answer pings to keep the agent registered.
+  void ping(const UPID& from, bool)
+  {
+    send(from, PongSlaveMessage());
+  }
+
+  const UPID masterPid;
+  const SlaveID slaveId;
+  const size_t frameworksPerAgent;
+  const size_t tasksPerFramework;
+  const size_t completedFrameworksPerAgent;
+  const size_t tasksPerCompletedFramework;
+
+  ReregisterSlaveMessage message;
+  Promise<Nothing> promise;
+};
+
+
+class TestSlave
+{
+public:
+  TestSlave(
+      const UPID& masterPid,
+      const SlaveID& slaveId,
+      size_t frameworksPerAgent,
+      size_t tasksPerFramework,
+      size_t completedFrameworksPerAgent,
+      size_t tasksPerCompletedFramework)
+    : process(new TestSlaveProcess(
+          masterPid,
+          slaveId,
+          frameworksPerAgent,
+          tasksPerFramework,
+          completedFrameworksPerAgent,
+          tasksPerCompletedFramework))
+  {
+    spawn(process.get());
+  }
+
+  ~TestSlave()
+  {
+    terminate(process.get());
+    process::wait(process.get());
+  }
+
+  Future<Nothing> reregister()
+  {
+    return dispatch(process.get(), &TestSlaveProcess::reregister);
+  }
+
+private:
+  Owned<TestSlaveProcess> process;
+};
+
+
+class MasterFailover_BENCHMARK_Test
+  : public MesosTest,
+    public WithParamInterface<tuple<size_t, size_t, size_t, size_t, size_t>> {};
+
+
+// The value tuples are defined as:
+// - agentCount
+// - frameworksPerAgent
+// - tasksPerFramework (per agent)
+// - completedFrameworksPerAgent
+// - tasksPerCompletedFramework (per agent)
+INSTANTIATE_TEST_CASE_P(
+    AgentFrameworkTaskCount,
+    MasterFailover_BENCHMARK_Test,
+    ::testing::Values(
+        make_tuple(2000, 5, 10, 5, 10),
+        make_tuple(2000, 5, 20, 0, 0),
+        make_tuple(20000, 1, 5, 0, 0)));
+
+
+// This test measures the time from all agents start to reregister to
+// to when all have received `SlaveReregisteredMessage`.
+TEST_P(MasterFailover_BENCHMARK_Test, AgentReregistrationDelay)
+{
+  size_t agentCount;
+  size_t frameworksPerAgent;
+  size_t tasksPerFramework;
+  size_t completedFrameworksPerAgent;
+  size_t tasksPerCompletedFramework;
+
+  tie(agentCount,
+      frameworksPerAgent,
+      tasksPerFramework,
+      completedFrameworksPerAgent,
+      tasksPerCompletedFramework) = GetParam();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.authenticate_agents = false;
+
+  // Use replicated log so it better simulates the production scenario.
+  masterFlags.registry = "replicated_log";
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  list<TestSlave> slaves;
+
+  for (size_t i = 0; i < agentCount; i++) {
+    SlaveID slaveId;
+    slaveId.set_value("agent" + stringify(i));
+
+    slaves.emplace_back(
+        master.get()->pid,
+        slaveId,
+        frameworksPerAgent,
+        tasksPerFramework,
+        completedFrameworksPerAgent,
+        tasksPerCompletedFramework);
+  }
+
+  // Make sure all agents are ready to reregister before we start the stopwatch.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  list<Future<Nothing>> reregistered;
+
+  cout << "Starting reregistration for all agents" << endl;
+
+  // Measure the time for all agents to receive `SlaveReregisteredMessage`.
+  Stopwatch watch;
+  watch.start();
+
+  foreach (TestSlave& slave, slaves) {
+    reregistered.push_back(slave.reregister());
+  }
+
+  await(reregistered).await();
+
+  watch.stop();
+
+  cout << "Reregistered " << agentCount << " agents with a total of "
+       << frameworksPerAgent * tasksPerFramework * agentCount
+       << " running tasks and "
+       << completedFrameworksPerAgent * tasksPerCompletedFramework * agentCount
+       << " completed tasks in "
+       << watch.elapsed() << endl;
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {