You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2016/04/09 00:14:29 UTC
mesos git commit: Added an example framework using dynamic
reservation.
Repository: mesos
Updated Branches:
refs/heads/master 21937f974 -> 58b3a7a41
Added an example framework using dynamic reservation.
Review: https://reviews.apache.org/r/37168/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58b3a7a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58b3a7a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58b3a7a4
Branch: refs/heads/master
Commit: 58b3a7a414293df7074b6a5e5aeda4c81a141d10
Parents: 21937f9
Author: Klaus Ma <kl...@gmail.com>
Authored: Fri Apr 8 14:47:45 2016 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Fri Apr 8 15:09:03 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 6 +
src/examples/dynamic_reservation_framework.cpp | 403 +++++++++++++++++++
src/tests/dynamic_reservation_framework_test.sh | 35 ++
src/tests/examples_tests.cpp | 6 +-
4 files changed, 446 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b7f3f36..4375b03 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1652,6 +1652,11 @@ clean-python:
PHONY_TARGETS += clean-python
# Test (make check) binaries.
+check_PROGRAMS += dynamic-reservation-framework
+dynamic_reservation_framework_SOURCES = examples/dynamic_reservation_framework.cpp
+dynamic_reservation_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+dynamic_reservation_framework_LDADD = libmesos.la $(LDADD)
+
check_PROGRAMS += test-http-framework
test_http_framework_SOURCES = examples/test_http_framework.cpp
test_http_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -2048,6 +2053,7 @@ EXTRA_DIST += \
dist_check_SCRIPTS += \
tests/balloon_framework_test.sh \
+ tests/dynamic_reservation_framework_test.sh \
tests/test_http_framework_test.sh \
tests/java_exception_test.sh \
tests/java_framework_test.sh \
http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/examples/dynamic_reservation_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/dynamic_reservation_framework.cpp b/src/examples/dynamic_reservation_framework.cpp
new file mode 100644
index 0000000..b51b42b
--- /dev/null
+++ b/src/examples/dynamic_reservation_framework.cpp
@@ -0,0 +1,403 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/resources.hpp>
+#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <mesos/authorizer/acls.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+using namespace mesos;
+
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+
+const int32_t CPUS_PER_TASK = 1;
+const int32_t MEM_PER_TASK = 128;
+
+// The framework reserves resources to run at most one task at a time
+// on each agent; the resources are reserved when they are offered to
+// the framework for the first time, and are unreserved when all tasks
+// are done. The framework terminates if any task failed.
+class DynamicReservationScheduler : public Scheduler
+{
+public:
+ DynamicReservationScheduler(
+ const string& _command,
+ const string& _role,
+ const string& _principal)
+ : command(_command),
+ role(_role),
+ tasksLaunched(0),
+ tasksFinished(0),
+ totalTasks(5),
+ principal(_principal)
+ {
+ reservationInfo.set_principal(principal);
+ taskResources = TASK_RESOURCES.flatten(role, reservationInfo);
+ }
+
+ virtual ~DynamicReservationScheduler() {}
+
+ virtual void registered(SchedulerDriver*,
+ const FrameworkID&,
+ const MasterInfo&)
+ {
+ LOG(INFO) << "Registered!";
+ }
+
+ virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}
+
+ virtual void disconnected(SchedulerDriver* driver) {}
+
+ virtual void resourceOffers(SchedulerDriver* driver,
+ const vector<Offer>& offers)
+ {
+ foreach (const Offer& offer, offers) {
+ LOG(INFO) << "Received offer " << offer.id() << " with "
+ << offer.resources();
+
+ // If the framework got this offer for the first time, the state is
+ // `State::INIT`; framework will reserve it (sending RESERVE operation
+ // to master) in this loop.
+ if (!states.contains(offer.slave_id())) {
+ // If all tasks were launched, do not reserve more resources; wait
+ // for them to finish and unreserve resources.
+ if (tasksLaunched == totalTasks) {
+ continue;
+ }
+
+ states[offer.slave_id()] = State::INIT;
+ };
+
+ const State state = states[offer.slave_id()];
+
+ switch (state) {
+ case State::INIT: {
+ // Framework reserves resources from this offer for only one task;
+ // the task'll be dispatched when reserved resources are re-offered
+ // to this framework.
+ Resources resources = offer.resources();
+ Resources unreserved = resources.unreserved();
+ if (!unreserved.contains(TASK_RESOURCES)) {
+ LOG(INFO) << "Failed to reserve resources for task in offer "
+ << stringify(offer.id());
+ break;
+ }
+ driver->acceptOffers({offer.id()}, {RESERVE(taskResources)});
+ states[offer.slave_id()] = State::RESERVING;
+ break;
+ }
+ case State::RESERVING: {
+ Resources resources = offer.resources();
+ Resources reserved = resources.reserved(role);
+ if (!reserved.contains(taskResources)) {
+ break;
+ }
+ states[offer.slave_id()] = State::RESERVED;
+
+ // We fallthorugh here to save an offer cycle.
+ // [[fallthrough]]
+ }
+ case State::RESERVED: {
+ Resources resources = offer.resources();
+ Resources reserved = resources.reserved(role);
+
+ CHECK(reserved.contains(taskResources));
+
+ // If all tasks were launched, unreserve those resources.
+ if (tasksLaunched == totalTasks) {
+ driver->acceptOffers({offer.id()}, {UNRESERVE(taskResources)});
+ states[offer.slave_id()] = State::UNRESERVING;
+ break;
+ }
+
+ // Framework dispatches task on the reserved resources.
+ CHECK(tasksLaunched < totalTasks);
+
+ // Launch tasks on reserved resources.
+ const string& taskId = stringify(tasksLaunched++);
+ LOG(INFO) << "Launching task " << taskId << " using offer "
+ << offer.id();
+ TaskInfo task;
+ task.set_name("Task " + taskId + ": " + command);
+ task.mutable_task_id()->set_value(taskId);
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_command()->set_shell(true);
+ task.mutable_command()->set_value(command);
+ task.mutable_resources()->MergeFrom(taskResources);
+ driver->launchTasks(offer.id(), {task});
+ states[offer.slave_id()] = State::TASK_RUNNING;
+ break;
+ }
+ case State::TASK_RUNNING:
+ LOG(INFO) << "The task on " << offer.slave_id()
+ << " is running, waiting for task done";
+ break;
+ case State::UNRESERVING: {
+ Resources resources = offer.resources();
+ Resources reserved = resources.reserved(role);
+ if (!reserved.contains(taskResources)) {
+ states[offer.slave_id()] = State::UNRESERVED;
+ }
+ break;
+ }
+ case State::UNRESERVED:
+ // If state of slave is UNRESERVED, ignore it. The driver is stopped
+ // when all tasks are done and all resources are unreserved.
+ break;
+ }
+ }
+
+ // If all tasks were done and all resources were unreserved,
+ // stop the driver.
+ if (tasksFinished == totalTasks) {
+ // If all resources were unreserved, stop the driver.
+ foreachvalue (const State& state, states) {
+ if (state != State::UNRESERVED) {
+ return;
+ }
+ }
+
+ driver->stop();
+ }
+ }
+
+ virtual void offerRescinded(SchedulerDriver* driver,
+ const OfferID& offerId) {}
+
+ virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+ {
+ const string& taskId = status.task_id().value();
+
+ if (status.state() == TASK_FINISHED) {
+ ++tasksFinished;
+ // Mark state of slave as RESERVED, so other tasks can reuse it.
+ CHECK(states[status.slave_id()] == State::TASK_RUNNING);
+ states[status.slave_id()] = State::RESERVED;
+ LOG(INFO) << "Task " << taskId << " is finished at slave "
+ << status.slave_id();
+ } else {
+ LOG(INFO) << "Task " << taskId << " is in state " << status.state();
+ }
+
+ if (status.state() == TASK_LOST ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_FAILED) {
+ LOG(INFO) << "Aborting because task " << taskId
+ << " is in unexpected state " << status.state()
+ << " with reason " << status.reason()
+ << " from source " << status.source()
+ << " with message '" << status.message() << "'";
+ driver->abort();
+ }
+
+ if (tasksFinished == totalTasks) {
+ LOG(INFO) << "All tasks done, waiting for unreserving resources";
+ }
+ }
+
+ virtual void frameworkMessage(SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data) {}
+
+ virtual void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) {}
+
+ virtual void executorLost(SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status) {}
+
+ virtual void error(SchedulerDriver* driver, const string& message)
+ {
+ LOG(ERROR) << message;
+ }
+
+private:
+ string command;
+ string role;
+ int tasksLaunched;
+ int tasksFinished;
+ int totalTasks;
+ string principal;
+
+
+ // | Next State | Action
+ // ------------------------------------------
+ // INIT | RESERVING | Transfer to RESERVING after sending
+ // | | reserve operation to master.
+ // ------------------------------------------
+ // RESERVING | RESERVED | Transfer to RESERVED when got reserved
+ // | | resources from master.
+ // ------------------------------------------
+ // RESERVED | TASK_RUNNING | Transfer to TASK_RUNNING after launching
+ // | | on reserved resources.
+ // | UNRESERVING | Transfer to UNRESERVING if all tasks are
+ // | | launched; sending unreserve operation to
+ // | | master.
+ // ------------------------------------------
+ // TASK_RUNNING | RESERVED | Transfer to RESERVED if the running task
+ // | | done; the resource is reused by other
+ // | | tasks.
+ // ------------------------------------------
+ // UNRESERVING | UNRESERVED | Transfer to UNRESERVED if the offer did
+ // | | not include reserved resources.
+ // ------------------------------------------
+ // UNRESERVED | - | If all resources are unreserved, stop
+ // | | the driver.
+
+ enum State {
+ INIT, // The framework receive the offer for the first time.
+ RESERVING, // The framework sent the RESERVE request to master.
+ RESERVED, // The framework got reserved resources from master.
+ TASK_RUNNING, // The task was dispatched to master.
+ UNRESERVING, // The framework sent the UNRESERVE request to master.
+ UNRESERVED, // The resources was unreserved.
+ };
+
+ hashmap<SlaveID, State> states;
+ Resource::ReservationInfo reservationInfo;
+ Resources taskResources;
+
+ static const Resources TASK_RESOURCES;
+
+ Offer::Operation RESERVE(Resources resources)
+ {
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::RESERVE);
+ operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
+ return operation;
+ }
+
+ Offer::Operation UNRESERVE(Resources resources)
+ {
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::UNRESERVE);
+ operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
+ return operation;
+ }
+};
+
+
+const Resources DynamicReservationScheduler::TASK_RESOURCES = Resources::parse(
+ "cpus:" + stringify(CPUS_PER_TASK) +
+ ";mem:" + stringify(MEM_PER_TASK)).get();
+
+
+class Flags : public virtual flags::FlagsBase
+{
+public:
+ Flags()
+ {
+ add(&master,
+ "master",
+ "The master to connect to. May be one of:\n"
+ " master@addr:port (The PID of the master)\n"
+ " zk://host1:port1,host2:port2,.../path\n"
+ " zk://username:password@host1:port1,host2:port2,.../path\n"
+ " file://path/to/file (where file contains one of the above)");
+
+ add(&role,
+ "role",
+ "Role to use when registering");
+
+ add(&principal,
+ "principal",
+ "The principal used to identify this framework",
+ "test");
+
+ add(&command,
+ "command",
+ "The command to run for each task.",
+ "echo hello");
+ }
+
+ Option<string> master;
+ Option<string> role;
+ string principal;
+ string command;
+};
+
+int main(int argc, char** argv)
+{
+ Flags flags;
+
+ Try<Nothing> load = flags.load(None(), argc, argv);
+ if (load.isError()) {
+ EXIT(EXIT_FAILURE) << flags.usage(load.error());
+ } else if (flags.master.isNone()) {
+ EXIT(EXIT_FAILURE) << flags.usage("Missing --master");
+ } else if (flags.role.isNone()) {
+ EXIT(EXIT_FAILURE) << flags.usage("Missing --role");
+ } else if (flags.role.get() == "*") {
+ EXIT(EXIT_FAILURE)
+ << flags.usage("Role is incorrect; the default '*' role cannot be used");
+ }
+
+ FrameworkInfo framework;
+ framework.set_user(""); // Mesos'll fill in the current user.
+ framework.set_name("Dynamic Reservation Framework (C++)");
+ framework.set_role(flags.role.get());
+ framework.set_principal(flags.principal);
+
+ DynamicReservationScheduler scheduler(
+ flags.command,
+ flags.role.get(),
+ flags.principal);
+
+ if (flags.master.get() == "local") {
+ // Configure master.
+ os::setenv("MESOS_AUTHENTICATE", "false");
+
+ ACLs acls;
+ ACL::RegisterFramework* acl = acls.add_register_frameworks();
+ acl->mutable_principals()->set_type(ACL::Entity::ANY);
+ acl->mutable_roles()->add_values(flags.role.get());
+ os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
+ }
+
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &scheduler,
+ framework,
+ flags.master.get());
+
+ int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
+
+ // Ensure that the driver process terminates.
+ driver->stop();
+
+ delete driver;
+ return status;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/tests/dynamic_reservation_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/dynamic_reservation_framework_test.sh b/src/tests/dynamic_reservation_framework_test.sh
new file mode 100755
index 0000000..00a10f1
--- /dev/null
+++ b/src/tests/dynamic_reservation_framework_test.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+ exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_BUILD_DIR in environment" && \
+ exit 1
+
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set local Mesos runner to use 3 slaves
+export MESOS_NUM_SLAVES=3
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the C++ test framework executes without crashing (returns 0).
+exec ${MESOS_BUILD_DIR}/src/dynamic-reservation-framework --master=local --role=test
http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index 6ddac17..ac513ce 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -18,19 +18,18 @@
#include "tests/script.hpp"
-
// Run each of the sample frameworks in local mode.
TEST_SCRIPT(ExamplesTest, TestFramework, "test_framework_test.sh")
TEST_SCRIPT(ExamplesTest, NoExecutorFramework, "no_executor_framework_test.sh")
-
TEST_SCRIPT(ExamplesTest, TestHTTPFramework,
"test_http_framework_test.sh")
-
TEST_SCRIPT(ExamplesTest, PersistentVolumeFramework,
"persistent_volume_framework_test.sh")
+TEST_SCRIPT(ExamplesTest, DynamicReservationFramework,
+ "dynamic_reservation_framework_test.sh")
#ifdef MESOS_HAS_JAVA
TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
@@ -38,7 +37,6 @@ TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh")
#endif
-
#ifdef MESOS_HAS_PYTHON
TEST_SCRIPT(ExamplesTest, PythonFramework, "python_framework_test.sh")
#endif