You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2016/04/09 00:14:29 UTC

mesos git commit: Added an example framework using dynamic reservation.

Repository: mesos
Updated Branches:
  refs/heads/master 21937f974 -> 58b3a7a41


Added an example framework using dynamic reservation.

Review: https://reviews.apache.org/r/37168/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58b3a7a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58b3a7a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58b3a7a4

Branch: refs/heads/master
Commit: 58b3a7a414293df7074b6a5e5aeda4c81a141d10
Parents: 21937f9
Author: Klaus Ma <kl...@gmail.com>
Authored: Fri Apr 8 14:47:45 2016 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Fri Apr 8 15:09:03 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   6 +
 src/examples/dynamic_reservation_framework.cpp  | 403 +++++++++++++++++++
 src/tests/dynamic_reservation_framework_test.sh |  35 ++
 src/tests/examples_tests.cpp                    |   6 +-
 4 files changed, 446 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b7f3f36..4375b03 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1652,6 +1652,11 @@ clean-python:
 PHONY_TARGETS += clean-python
 
 # Test (make check) binaries.
+check_PROGRAMS += dynamic-reservation-framework
+dynamic_reservation_framework_SOURCES = examples/dynamic_reservation_framework.cpp
+dynamic_reservation_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+dynamic_reservation_framework_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += test-http-framework
 test_http_framework_SOURCES = examples/test_http_framework.cpp
 test_http_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -2048,6 +2053,7 @@ EXTRA_DIST +=							\
 
 dist_check_SCRIPTS +=						\
   tests/balloon_framework_test.sh				\
+  tests/dynamic_reservation_framework_test.sh			\
   tests/test_http_framework_test.sh				\
   tests/java_exception_test.sh					\
   tests/java_framework_test.sh					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/examples/dynamic_reservation_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/dynamic_reservation_framework.cpp b/src/examples/dynamic_reservation_framework.cpp
new file mode 100644
index 0000000..b51b42b
--- /dev/null
+++ b/src/examples/dynamic_reservation_framework.cpp
@@ -0,0 +1,403 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/resources.hpp>
+#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <mesos/authorizer/acls.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+using namespace mesos;
+
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+
+const int32_t CPUS_PER_TASK = 1;
+const int32_t MEM_PER_TASK = 128;
+
+// The framework reserves resources to run at most one task at a time
+// on each agent; the resources are reserved when they are offered to
+// the framework for the first time, and are unreserved when all tasks
+// are done. The framework terminates if any task failed.
+class DynamicReservationScheduler : public Scheduler
+{
+public:
+  DynamicReservationScheduler(
+      const string& _command,
+      const string& _role,
+      const string& _principal)
+    : command(_command),
+      role(_role),
+      tasksLaunched(0),
+      tasksFinished(0),
+      totalTasks(5),
+      principal(_principal)
+  {
+    reservationInfo.set_principal(principal);
+    taskResources = TASK_RESOURCES.flatten(role, reservationInfo);
+  }
+
+  virtual ~DynamicReservationScheduler() {}
+
+  virtual void registered(SchedulerDriver*,
+                          const FrameworkID&,
+                          const MasterInfo&)
+  {
+    LOG(INFO) << "Registered!";
+  }
+
+  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}
+
+  virtual void disconnected(SchedulerDriver* driver) {}
+
+  virtual void resourceOffers(SchedulerDriver* driver,
+                              const vector<Offer>& offers)
+  {
+    foreach (const Offer& offer, offers) {
+      LOG(INFO) << "Received offer " << offer.id() << " with "
+                << offer.resources();
+
+      // If the framework got this offer for the first time, the state is
+      // `State::INIT`; framework will reserve it (sending RESERVE operation
+      // to master) in this loop.
+      if (!states.contains(offer.slave_id())) {
+        // If all tasks were launched, do not reserve more resources; wait
+        // for them to finish and unreserve resources.
+        if (tasksLaunched == totalTasks) {
+          continue;
+        }
+
+        states[offer.slave_id()] = State::INIT;
+      };
+
+      const State state = states[offer.slave_id()];
+
+      switch (state) {
+        case State::INIT: {
+          // Framework reserves resources from this offer for only one task;
+          // the task'll be dispatched when reserved resources are re-offered
+          // to this framework.
+          Resources resources = offer.resources();
+          Resources unreserved = resources.unreserved();
+          if (!unreserved.contains(TASK_RESOURCES)) {
+            LOG(INFO) << "Failed to reserve resources for task in offer "
+                      << stringify(offer.id());
+            break;
+          }
+          driver->acceptOffers({offer.id()}, {RESERVE(taskResources)});
+          states[offer.slave_id()] = State::RESERVING;
+          break;
+        }
+        case State::RESERVING: {
+          Resources resources = offer.resources();
+          Resources reserved = resources.reserved(role);
+          if (!reserved.contains(taskResources)) {
+            break;
+          }
+          states[offer.slave_id()] = State::RESERVED;
+
+          // We fallthorugh here to save an offer cycle.
+          // [[fallthrough]]
+        }
+        case State::RESERVED: {
+          Resources resources = offer.resources();
+          Resources reserved = resources.reserved(role);
+
+          CHECK(reserved.contains(taskResources));
+
+          // If all tasks were launched, unreserve those resources.
+          if (tasksLaunched == totalTasks) {
+            driver->acceptOffers({offer.id()}, {UNRESERVE(taskResources)});
+            states[offer.slave_id()] = State::UNRESERVING;
+            break;
+          }
+
+          // Framework dispatches task on the reserved resources.
+          CHECK(tasksLaunched < totalTasks);
+
+          // Launch tasks on reserved resources.
+          const string& taskId = stringify(tasksLaunched++);
+          LOG(INFO) << "Launching task " << taskId << " using offer "
+                    << offer.id();
+          TaskInfo task;
+          task.set_name("Task " + taskId + ": " + command);
+          task.mutable_task_id()->set_value(taskId);
+          task.mutable_slave_id()->MergeFrom(offer.slave_id());
+          task.mutable_command()->set_shell(true);
+          task.mutable_command()->set_value(command);
+          task.mutable_resources()->MergeFrom(taskResources);
+          driver->launchTasks(offer.id(), {task});
+          states[offer.slave_id()] = State::TASK_RUNNING;
+          break;
+        }
+        case State::TASK_RUNNING:
+          LOG(INFO) << "The task on " << offer.slave_id()
+                    << " is running, waiting for task done";
+          break;
+        case State::UNRESERVING: {
+          Resources resources = offer.resources();
+          Resources reserved = resources.reserved(role);
+          if (!reserved.contains(taskResources)) {
+            states[offer.slave_id()] = State::UNRESERVED;
+          }
+          break;
+        }
+        case State::UNRESERVED:
+          // If state of slave is UNRESERVED, ignore it. The driver is stopped
+          // when all tasks are done and all resources are unreserved.
+          break;
+      }
+    }
+
+    // If all tasks were done and all resources were unreserved,
+    // stop the driver.
+    if (tasksFinished == totalTasks) {
+      // If all resources were unreserved, stop the driver.
+      foreachvalue (const State& state, states) {
+        if (state != State::UNRESERVED) {
+          return;
+        }
+      }
+
+      driver->stop();
+    }
+  }
+
+  virtual void offerRescinded(SchedulerDriver* driver,
+                              const OfferID& offerId) {}
+
+  virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+  {
+    const string& taskId = status.task_id().value();
+
+    if (status.state() == TASK_FINISHED) {
+      ++tasksFinished;
+      // Mark state of slave as RESERVED, so other tasks can reuse it.
+      CHECK(states[status.slave_id()] == State::TASK_RUNNING);
+      states[status.slave_id()] = State::RESERVED;
+      LOG(INFO) << "Task " << taskId << " is finished at slave "
+                << status.slave_id();
+    } else {
+      LOG(INFO) << "Task " << taskId << " is in state " << status.state();
+    }
+
+    if (status.state() == TASK_LOST ||
+        status.state() == TASK_KILLED ||
+        status.state() == TASK_FAILED) {
+      LOG(INFO) << "Aborting because task " << taskId
+                << " is in unexpected state " << status.state()
+                << " with reason " << status.reason()
+                << " from source " << status.source()
+                << " with message '" << status.message() << "'";
+      driver->abort();
+    }
+
+    if (tasksFinished == totalTasks) {
+      LOG(INFO) << "All tasks done, waiting for unreserving resources";
+    }
+  }
+
+  virtual void frameworkMessage(SchedulerDriver* driver,
+                                const ExecutorID& executorId,
+                                const SlaveID& slaveId,
+                                const string& data) {}
+
+  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) {}
+
+  virtual void executorLost(SchedulerDriver* driver,
+                            const ExecutorID& executorId,
+                            const SlaveID& slaveId,
+                            int status) {}
+
+  virtual void error(SchedulerDriver* driver, const string& message)
+  {
+    LOG(ERROR) << message;
+  }
+
+private:
+  string command;
+  string role;
+  int tasksLaunched;
+  int tasksFinished;
+  int totalTasks;
+  string principal;
+
+
+  //              | Next State    | Action
+  // ------------------------------------------
+  // INIT         | RESERVING     | Transfer to RESERVING after sending
+  //              |               | reserve operation to master.
+  // ------------------------------------------
+  // RESERVING    | RESERVED      | Transfer to RESERVED when got reserved
+  //              |               | resources from master.
+  // ------------------------------------------
+  // RESERVED     | TASK_RUNNING  | Transfer to TASK_RUNNING after launching
+  //              |               | on reserved resources.
+  //              | UNRESERVING   | Transfer to UNRESERVING if all tasks are
+  //              |               | launched; sending unreserve operation to
+  //              |               | master.
+  // ------------------------------------------
+  // TASK_RUNNING | RESERVED      | Transfer to RESERVED if the running task
+  //              |               | done; the resource is reused by other
+  //              |               | tasks.
+  // ------------------------------------------
+  // UNRESERVING  | UNRESERVED    | Transfer to UNRESERVED if the offer did
+  //              |               | not include reserved resources.
+  // ------------------------------------------
+  // UNRESERVED   | -             | If all resources are unreserved, stop
+  //              |               | the driver.
+
+  enum State {
+    INIT,         // The framework receive the offer for the first time.
+    RESERVING,    // The framework sent the RESERVE request to master.
+    RESERVED,     // The framework got reserved resources from master.
+    TASK_RUNNING, // The task was dispatched to master.
+    UNRESERVING,  // The framework sent the UNRESERVE request to master.
+    UNRESERVED,   // The resources was unreserved.
+  };
+
+  hashmap<SlaveID, State> states;
+  Resource::ReservationInfo reservationInfo;
+  Resources taskResources;
+
+  static const Resources TASK_RESOURCES;
+
+  Offer::Operation RESERVE(Resources resources)
+  {
+    Offer::Operation operation;
+    operation.set_type(Offer::Operation::RESERVE);
+    operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
+    return operation;
+  }
+
+  Offer::Operation UNRESERVE(Resources resources)
+  {
+    Offer::Operation operation;
+    operation.set_type(Offer::Operation::UNRESERVE);
+    operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
+    return operation;
+  }
+};
+
+
+const Resources DynamicReservationScheduler::TASK_RESOURCES = Resources::parse(
+    "cpus:" + stringify(CPUS_PER_TASK) +
+    ";mem:" + stringify(MEM_PER_TASK)).get();
+
+
+class Flags : public virtual flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    add(&master,
+        "master",
+        "The master to connect to. May be one of:\n"
+        "  master@addr:port (The PID of the master)\n"
+        "  zk://host1:port1,host2:port2,.../path\n"
+        "  zk://username:password@host1:port1,host2:port2,.../path\n"
+        "  file://path/to/file (where file contains one of the above)");
+
+    add(&role,
+        "role",
+        "Role to use when registering");
+
+    add(&principal,
+        "principal",
+        "The principal used to identify this framework",
+        "test");
+
+    add(&command,
+        "command",
+        "The command to run for each task.",
+        "echo hello");
+  }
+
+  Option<string> master;
+  Option<string> role;
+  string principal;
+  string command;
+};
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  Try<Nothing> load = flags.load(None(), argc, argv);
+  if (load.isError()) {
+    EXIT(EXIT_FAILURE) << flags.usage(load.error());
+  } else if (flags.master.isNone()) {
+    EXIT(EXIT_FAILURE) << flags.usage("Missing --master");
+  } else if (flags.role.isNone()) {
+    EXIT(EXIT_FAILURE) << flags.usage("Missing --role");
+  } else if (flags.role.get() == "*") {
+    EXIT(EXIT_FAILURE)
+      << flags.usage("Role is incorrect; the default '*' role cannot be used");
+  }
+
+  FrameworkInfo framework;
+  framework.set_user(""); // Mesos'll fill in the current user.
+  framework.set_name("Dynamic Reservation Framework (C++)");
+  framework.set_role(flags.role.get());
+  framework.set_principal(flags.principal);
+
+  DynamicReservationScheduler scheduler(
+      flags.command,
+      flags.role.get(),
+      flags.principal);
+
+  if (flags.master.get() == "local") {
+    // Configure master.
+    os::setenv("MESOS_AUTHENTICATE", "false");
+
+    ACLs acls;
+    ACL::RegisterFramework* acl = acls.add_register_frameworks();
+    acl->mutable_principals()->set_type(ACL::Entity::ANY);
+    acl->mutable_roles()->add_values(flags.role.get());
+    os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
+  }
+
+  MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+      &scheduler,
+      framework,
+      flags.master.get());
+
+  int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
+
+  // Ensure that the driver process terminates.
+  driver->stop();
+
+  delete driver;
+  return status;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/tests/dynamic_reservation_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/dynamic_reservation_framework_test.sh b/src/tests/dynamic_reservation_framework_test.sh
new file mode 100755
index 0000000..00a10f1
--- /dev/null
+++ b/src/tests/dynamic_reservation_framework_test.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set local Mesos runner to use 3 slaves
+export MESOS_NUM_SLAVES=3
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the C++ test framework executes without crashing (returns 0).
+exec ${MESOS_BUILD_DIR}/src/dynamic-reservation-framework --master=local --role=test

http://git-wip-us.apache.org/repos/asf/mesos/blob/58b3a7a4/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index 6ddac17..ac513ce 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -18,19 +18,18 @@
 
 #include "tests/script.hpp"
 
-
 // Run each of the sample frameworks in local mode.
 TEST_SCRIPT(ExamplesTest, TestFramework, "test_framework_test.sh")
 TEST_SCRIPT(ExamplesTest, NoExecutorFramework, "no_executor_framework_test.sh")
 
-
 TEST_SCRIPT(ExamplesTest, TestHTTPFramework,
             "test_http_framework_test.sh")
 
-
 TEST_SCRIPT(ExamplesTest, PersistentVolumeFramework,
             "persistent_volume_framework_test.sh")
 
+TEST_SCRIPT(ExamplesTest, DynamicReservationFramework,
+            "dynamic_reservation_framework_test.sh")
 
 #ifdef MESOS_HAS_JAVA
 TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
@@ -38,7 +37,6 @@ TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh")
 #endif
 
-
 #ifdef MESOS_HAS_PYTHON
 TEST_SCRIPT(ExamplesTest, PythonFramework, "python_framework_test.sh")
 #endif