You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/04/10 23:57:01 UTC
[1/2] mesos git commit: Added an example framework for testing
persistent volumes.
Repository: mesos
Updated Branches:
refs/heads/master 743e9e739 -> 4b15b9608
Added an example framework for testing persistent volumes.
Review: https://reviews.apache.org/r/32984
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b15b960
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b15b960
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b15b960
Branch: refs/heads/master
Commit: 4b15b96089996243f040cc8e6956fae032511417
Parents: 2037099
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Apr 8 11:30:51 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Apr 10 14:56:36 2015 -0700
----------------------------------------------------------------------
src/Makefile.am | 6 +
src/examples/persistent_volume_framework.cpp | 497 +++++++++++++++++++++
src/tests/examples_tests.cpp | 3 +
src/tests/persistent_volume_framework_test.sh | 28 ++
4 files changed, 534 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fa609da..d15a373 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1276,6 +1276,11 @@ load_generator_framework_SOURCES = examples/load_generator_framework.cpp
load_generator_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
load_generator_framework_LDADD = libmesos.la $(LDADD)
+check_PROGRAMS += persistent-volume-framework
+persistent_volume_framework_SOURCES = examples/persistent_volume_framework.cpp
+persistent_volume_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+persistent_volume_framework_LDADD = libmesos.la $(LDADD)
+
if OS_LINUX
check_PROGRAMS += setns-test-helper
setns_test_helper_SOURCES = \
@@ -1480,6 +1485,7 @@ dist_check_SCRIPTS += \
tests/java_framework_test.sh \
tests/java_log_test.sh \
tests/no_executor_framework_test.sh \
+ tests/persistent_volume_framework_test.sh \
tests/python_framework_test.sh \
tests/test_framework_test.sh
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/examples/persistent_volume_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/persistent_volume_framework.cpp b/src/examples/persistent_volume_framework.cpp
new file mode 100644
index 0000000..8a893fc
--- /dev/null
+++ b/src/examples/persistent_volume_framework.cpp
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <sstream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <stout/flags.hpp>
+#include <stout/format.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/uuid.hpp>
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+
+using std::cout;
+using std::endl;
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+
+// TODO(jieyu): Currently, persistent volume is only allowed for
+// reserved resources.
+static Resources SHARD_INITIAL_RESOURCES(const string& role)
+{
+ return Resources::parse("cpus:0.1;mem:32;disk:16", role).get();
+}
+
+
+static Resource SHARD_PERSISTENT_VOLUME(
+ const string& role,
+ const string& persistenceId,
+ const string& containerPath)
+{
+ Volume volume;
+ volume.set_container_path(containerPath);
+ volume.set_mode(Volume::RW);
+
+ Resource::DiskInfo info;
+ info.mutable_persistence()->set_id(persistenceId);
+ info.mutable_volume()->CopyFrom(volume);
+
+ Resource resource = Resources::parse("disk", "8", role).get();
+ resource.mutable_disk()->CopyFrom(info);
+
+ return resource;
+}
+
+
+static Offer::Operation CREATE(const Resources& volumes)
+{
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::CREATE);
+ operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
+ return operation;
+}
+
+
+static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
+{
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::LAUNCH);
+
+ foreach (const TaskInfo& task, tasks) {
+ operation.mutable_launch()->add_task_infos()->CopyFrom(task);
+ }
+
+ return operation;
+}
+
+
+// The framework launches a task on each registered slave using a
+// persistent volume. It restarts the task once the previous one on
+// the slave finishes. The framework terminates once the number of
+// tasks launched on each slave reaches a limit.
+class PersistentVolumeScheduler : public Scheduler
+{
+public:
+ PersistentVolumeScheduler(
+ const FrameworkInfo& _frameworkInfo,
+ size_t numShards,
+ size_t tasksPerShard)
+ : frameworkInfo(_frameworkInfo)
+ {
+ for (size_t i = 0; i < numShards; i++) {
+ shards.push_back(Shard(
+ "shard-" + stringify(i),
+ frameworkInfo.role(),
+ tasksPerShard));
+ }
+ }
+
+ virtual void registered(
+ SchedulerDriver* driver,
+ const FrameworkID& frameworkId,
+ const MasterInfo& masterInfo)
+ {
+ LOG(INFO) << "Registered with master " << masterInfo
+ << " and got framework ID " << frameworkId;
+
+ frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+ }
+
+ virtual void reregistered(
+ SchedulerDriver* driver,
+ const MasterInfo& masterInfo)
+ {
+ LOG(INFO) << "Reregistered with master " << masterInfo;
+ }
+
+ virtual void disconnected(
+ SchedulerDriver* driver)
+ {
+ LOG(INFO) << "Disconnected!";
+ }
+
+ virtual void resourceOffers(
+ SchedulerDriver* driver,
+ const vector<Offer>& offers)
+ {
+ foreach (const Offer& offer, offers) {
+ LOG(INFO) << "Received offer " << offer.id() << " from slave "
+ << offer.slave_id() << " (" << offer.hostname() << ") "
+ << "with " << offer.resources();
+
+ Resources offered = offer.resources();
+
+ // The operation we will perform on the offer.
+ vector<Offer::Operation> operations;
+
+ foreach (Shard& shard, shards) {
+ switch (shard.state) {
+ case Shard::INIT:
+ if (offered.contains(shard.resources)) {
+ Resource volume = SHARD_PERSISTENT_VOLUME(
+ frameworkInfo.role(),
+ UUID::random().toString(),
+ "volume");
+
+ Try<Resources> resources = shard.resources.apply(CREATE(volume));
+ CHECK_SOME(resources);
+
+ TaskInfo task;
+ task.set_name(shard.name);
+ task.mutable_task_id()->set_value(UUID::random().toString());
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(resources.get());
+ task.mutable_command()->set_value("touch volume/persisted");
+
+ // Update the shard.
+ shard.state = Shard::STAGING;
+ shard.taskId = task.task_id();
+ shard.volume.id = volume.disk().persistence().id();
+ shard.volume.slave = offer.slave_id().value();
+ shard.resources = resources.get();
+ shard.launched++;
+
+ operations.push_back(CREATE(volume));
+ operations.push_back(LAUNCH({task}));
+
+ resources = offered.apply(vector<Offer::Operation>{
+ CREATE(volume),
+ LAUNCH({task})});
+
+ CHECK_SOME(resources);
+ offered = resources.get();
+ }
+ break;
+ case Shard::WAITING:
+ if (offered.contains(shard.resources)) {
+ CHECK_EQ(shard.volume.slave, offer.slave_id().value());
+
+ TaskInfo task;
+ task.set_name(shard.name);
+ task.mutable_task_id()->set_value(UUID::random().toString());
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(shard.resources);
+ task.mutable_command()->set_value("test -f volume/persisted");
+
+ // Update the shard.
+ shard.state = Shard::STAGING;
+ shard.taskId = task.task_id();
+ shard.launched++;
+
+ operations.push_back(LAUNCH({task}));
+ }
+ break;
+ case Shard::STAGING:
+ case Shard::RUNNING:
+ case Shard::DONE:
+ // Ignore the offer.
+ break;
+ default:
+ LOG(ERROR) << "Unexpected shard state: " << shard.state;
+ driver->abort();
+ break;
+ }
+ }
+
+ driver->acceptOffers({offer.id()}, operations);
+ }
+ }
+
+ virtual void offerRescinded(
+ SchedulerDriver* driver,
+ const OfferID& offerId)
+ {
+ LOG(INFO) << "Offer " << offerId << " has been rescinded";
+ }
+
+ virtual void statusUpdate(
+ SchedulerDriver* driver,
+ const TaskStatus& status)
+ {
+ LOG(INFO) << "Task '" << status.task_id() << "' is in state "
+ << status.state();
+
+ foreach (Shard& shard, shards) {
+ if (shard.taskId == status.task_id()) {
+ switch (status.state()) {
+ case TASK_RUNNING:
+ shard.state = Shard::RUNNING;
+ break;
+ case TASK_FINISHED:
+ if (shard.launched >= shard.tasks) {
+ shard.state = Shard::DONE;
+ } else {
+ shard.state = Shard::WAITING;
+ }
+ break;
+ case TASK_STAGING:
+ case TASK_STARTING:
+ // Ignore the status update.
+ break;
+ default:
+ LOG(ERROR) << "Unexpected task state " << status.state()
+ << " for task '" << status.task_id() << "'";
+ driver->abort();
+ break;
+ }
+
+ break;
+ }
+ }
+
+ // Check the terminal condition.
+ bool terminal = true;
+ foreach (const Shard& shard, shards) {
+ if (shard.state != Shard::DONE) {
+ terminal = false;
+ break;
+ }
+ }
+
+ if (terminal) {
+ driver->stop();
+ }
+ }
+
+ virtual void frameworkMessage(
+ SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data)
+ {
+ LOG(INFO) << "Received framework message from executor '" << executorId
+ << "' on slave " << slaveId << ": '" << data << "'";
+ }
+
+ virtual void slaveLost(
+ SchedulerDriver* driver,
+ const SlaveID& slaveId)
+ {
+ LOG(INFO) << "Lost slave " << slaveId;
+ }
+
+ virtual void executorLost(
+ SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status)
+ {
+ LOG(INFO) << "Lost executor '" << executorId << "' on slave "
+ << slaveId << ", status " << status;
+ }
+
+ virtual void error(
+ SchedulerDriver* driver,
+ const string& message)
+ {
+ LOG(ERROR) << message;
+ }
+
+private:
+ struct Shard
+ {
+ enum State
+ {
+ INIT = 0, // The shard hasn't been launched yet.
+ STAGING, // The shard has been launched.
+ RUNNING, // The shard is running.
+ WAITING, // The shard is waiting to be re-launched.
+ DONE, // The shard has finished all tasks.
+
+ // TODO(jieyu): Add another state so that we can track the
+ // destroy of the volume once all tasks finish.
+ };
+
+ // The persistent volume associated with this shard.
+ struct Volume
+ {
+ // The persistence ID.
+ string id;
+
+ // An identifier used to uniquely identify a slave (even across
+ // reboot). In the test, we use the slave ID since slaves will not
+ // be rebooted. Note that we cannot use hostname as the identifier
+ // in a local cluster because all slaves share the same hostname.
+ string slave;
+ };
+
+ Shard(const string& _name, const string& role, size_t _tasks)
+ : name(_name),
+ state(INIT),
+ resources(SHARD_INITIAL_RESOURCES(role)),
+ launched(0),
+ tasks(_tasks) {}
+
+ string name;
+ State state; // The current state of this shard.
+ TaskID taskId; // The ID of the current task.
+ Volume volume; // The persistent volume associated with the shard.
+ Resources resources; // Resources required to launch the shard.
+ size_t launched; // How many tasks this shard has launched.
+ size_t tasks; // How many tasks this shard should launch.
+ };
+
+ FrameworkInfo frameworkInfo;
+ vector<Shard> shards;
+};
+
+
+class Flags : public logging::Flags
+{
+public:
+ Flags()
+ {
+ add(&master,
+ "master",
+ "The master to connect to. May be one of:\n"
+ " master@addr:port (The PID of the master)\n"
+ " zk://host1:port1,host2:port2,.../path\n"
+ " zk://username:password@host1:port1,host2:port2,.../path\n"
+ " file://path/to/file (where file contains one of the above)");
+
+ add(&role,
+ "role",
+ "Role to use when registering",
+ "test");
+
+ add(&principal,
+ "principal",
+ "The principal used to identify this framework",
+ "test");
+
+ add(&num_shards,
+ "num_shards",
+ "The number of shards the framework will run.",
+ 3);
+
+ add(&tasks_per_shard,
+ "tasks_per_shard",
+ "The number of tasks should be launched per shard.",
+ 3);
+
+ add(&help,
+ "help",
+ "Print this help message",
+ false);
+ }
+
+ Option<string> master;
+ string role;
+ string principal;
+ size_t num_shards;
+ size_t tasks_per_shard;
+ bool help;
+};
+
+
+static string usage(const char* argv0, const flags::FlagsBase& flags)
+{
+ ostringstream stream;
+
+ stream << "Usage: " << os::basename(argv0).get() << " [...]" << endl
+ << endl
+ << "Supported options:" << endl
+ << flags.usage();
+
+ return stream.str();
+}
+
+
+int main(int argc, char** argv)
+{
+ Flags flags;
+
+ Try<Nothing> load = flags.load("MESOS_", argc, argv);
+
+ if (load.isError()) {
+ EXIT(1) << load.error() << endl << usage(argv[0], flags);
+ }
+
+ if (flags.help) {
+ EXIT(1) << usage(argv[0], flags);
+ }
+
+ if (flags.master.isNone()) {
+ EXIT(1) << "Missing required option --master. See --help";
+ }
+
+ logging::initialize(argv[0], flags, true); // Catch signals.
+
+ FrameworkInfo framework;
+ framework.set_user(""); // Have Mesos fill in the current user.
+ framework.set_name("Persistent Volume Framework (C++)");
+ framework.set_role(flags.role);
+ framework.set_checkpoint(true);
+ framework.set_principal(flags.principal);
+
+ if (flags.master.get() == "local") {
+ // Configure master.
+ os::setenv("MESOS_ROLES", flags.role);
+ os::setenv("MESOS_AUTHENTICATE", "false");
+
+ ACLs acls;
+ ACL::RegisterFramework* acl = acls.add_register_frameworks();
+ acl->mutable_principals()->set_type(ACL::Entity::ANY);
+ acl->mutable_roles()->add_values(flags.role);
+
+ os::setenv("MESOS_ACLS", stringify(JSON::Protobuf(acls)));
+
+ // Configure slave.
+ os::setenv("MESOS_DEFAULT_ROLE", flags.role);
+
+ const string launcherDir = os::dirname(os::realpath(argv[0]).get()).get();
+ os::setenv("MESOS_LAUNCHER_DIR", launcherDir);
+ os::libraries::appendPaths(launcherDir);
+ }
+
+ PersistentVolumeScheduler scheduler(
+ framework,
+ flags.num_shards,
+ flags.tasks_per_shard);
+
+ MesosSchedulerDriver* driver = new MesosSchedulerDriver(
+ &scheduler,
+ framework,
+ flags.master.get());
+
+ int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
+
+ driver->stop();
+ delete driver;
+ return status;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index 5222b6d..f85b815 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -32,6 +32,9 @@ TEST_SCRIPT(ExamplesTest, LowLevelSchedulerLibprocess,
TEST_SCRIPT(ExamplesTest, LowLevelSchedulerPthread,
"low_level_scheduler_pthread_test.sh")
+TEST_SCRIPT(ExamplesTest, PersistentVolumeFramework,
+ "persistent_volume_framework_test.sh");
+
#ifdef MESOS_HAS_JAVA
TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/tests/persistent_volume_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_framework_test.sh b/src/tests/persistent_volume_framework_test.sh
new file mode 100755
index 0000000..c96fb70
--- /dev/null
+++ b/src/tests/persistent_volume_framework_test.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+ exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_BUILD_DIR in environment" && \
+ exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set local Mesos runner to use 3 slaves
+export MESOS_NUM_SLAVES=3
+
+# Check that the framework executes without crashing (returns 0).
+exec ${MESOS_BUILD_DIR}/src/persistent-volume-framework --master=local
[2/2] mesos git commit: Fixed a bug regarding setting work_dir for a
local cluster.
Posted by ji...@apache.org.
Fixed a bug regarding setting work_dir for a local cluster.
Review: https://reviews.apache.org/r/32983
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2037099d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2037099d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2037099d
Branch: refs/heads/master
Commit: 2037099d6d95ef8d99d971e47cfd75968543d972
Parents: 743e9e7
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Apr 8 11:32:28 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Apr 10 14:56:36 2015 -0700
----------------------------------------------------------------------
src/local/local.cpp | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2037099d/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 1908336..289b9bc 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -287,6 +287,9 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
<< "slave flags from the environment: " << load.error();
}
+ // Use a different work directory for each slave.
+ flags.work_dir = path::join(flags.work_dir, stringify(i));
+
garbageCollectors->push_back(new GarbageCollector());
statusUpdateManagers->push_back(new StatusUpdateManager(flags));
fetchers->push_back(new Fetcher());
@@ -298,9 +301,6 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
}
- // Use a different work directory for each slave.
- flags.work_dir = path::join(flags.work_dir, stringify(i));
-
// NOTE: At this point detector is already initialized by the
// Master.
Slave* slave = new Slave(