You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2016/04/09 01:52:08 UTC
[4/6] mesos git commit: Add XFS disk isolator tests.
Add XFS disk isolator tests.
Review: https://reviews.apache.org/r/44949/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/255710b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/255710b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/255710b7
Branch: refs/heads/master
Commit: 255710b7c95e578c873e1317e3705a55e81b1f61
Parents: 04be1d0
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 8 14:53:56 2016 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Fri Apr 8 16:46:08 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 4 +-
src/slave/containerizer/mesos/containerizer.cpp | 7 +
.../containerizer/mesos/isolators/xfs/disk.cpp | 437 +++++++++++++++++++
.../containerizer/mesos/isolators/xfs/disk.hpp | 107 +++++
.../containerizer/mesos/isolators/xfs/utils.cpp | 6 +
.../containerizer/mesos/isolators/xfs/utils.hpp | 3 +
src/slave/flags.cpp | 7 +
src/slave/flags.hpp | 3 +
src/tests/containerizer/xfs_quota_tests.cpp | 425 +++++++++++++++++-
9 files changed, 997 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a16c2da..dc8f8e3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -899,7 +899,9 @@ MESOS_LINUX_FILES += \
if ENABLE_XFS_DISK_ISOLATOR
MESOS_LINUX_FILES += \
slave/containerizer/mesos/isolators/xfs/utils.cpp \
- slave/containerizer/mesos/isolators/xfs/utils.hpp
+ slave/containerizer/mesos/isolators/xfs/utils.hpp \
+ slave/containerizer/mesos/isolators/xfs/disk.cpp \
+ slave/containerizer/mesos/isolators/xfs/disk.hpp
endif
MESOS_NETWORK_ISOLATOR_FILES = \
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index a5dd223..c25fa92 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -55,6 +55,10 @@
#include "slave/containerizer/mesos/isolators/posix/disk.hpp"
+#if ENABLE_XFS_DISK_ISOLATOR
+#include "slave/containerizer/mesos/isolators/xfs/disk.hpp"
+#endif
+
#ifdef __linux__
#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/mem.hpp"
@@ -215,6 +219,9 @@ Try<MesosContainerizer*> MesosContainerizer::create(
{"posix/cpu", &PosixCpuIsolatorProcess::create},
{"posix/mem", &PosixMemIsolatorProcess::create},
{"posix/disk", &PosixDiskIsolatorProcess::create},
+#if ENABLE_XFS_DISK_ISOLATOR
+ {"xfs/disk", &XfsDiskIsolatorProcess::create},
+#endif
#ifdef __linux__
{"cgroups/cpu", &CgroupsCpushareIsolatorProcess::create},
{"cgroups/mem", &CgroupsMemIsolatorProcess::create},
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/disk.cpp b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
new file mode 100644
index 0000000..2f65f0a
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/containerizer/mesos/isolators/xfs/disk.hpp"
+
+#include <glog/logging.h>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+
+#include <stout/os/stat.hpp>
+
+#include "slave/paths.hpp"
+
+using std::list;
+using std::string;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::PID;
+using process::Process;
+using process::Promise;
+
+using mesos::slave::ContainerConfig;
+using mesos::slave::ContainerLaunchInfo;
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+static Try<IntervalSet<prid_t>> getIntervalSet(
+ const Value::Ranges& ranges)
+{
+ IntervalSet<prid_t> set;
+
+ for (int i = 0; i < ranges.range_size(); i++) {
+ if (ranges.range(i).end() > std::numeric_limits<prid_t>::max()) {
+ return Error("Project ID " + stringify(ranges.range(i).end()) +
+ " is out of range");
+ }
+
+ set += (Bound<prid_t>::closed(ranges.range(i).begin()),
+ Bound<prid_t>::closed(ranges.range(i).end()));
+ }
+
+ return set;
+}
+
+
+static Option<Bytes> getDiskResource(
+ const Resources& resources)
+{
+ Option<Bytes> bytes = None();
+
+ foreach (const Resource& resource, resources) {
+ if (resource.name() != "disk") {
+ continue;
+ }
+
+ // TODO(jpeach): Ignore persistent volume resources. The problem here is
+ // that we need to guarantee that we can track the removal of every
+ // directory for which we assign a project ID. Since destruction of
+ // persistent is not visible to the isolator, we don't want to risk
+ // leaking the project ID, or spuriously reusing it.
+ if (Resources::isPersistentVolume(resource)) {
+ continue;
+ }
+
+ if (resource.has_disk() && resource.disk().has_volume()) {
+ continue;
+ }
+
+ if (bytes.isSome()) {
+ bytes.get() += Megabytes(resource.scalar().value());
+ } else {
+ bytes = Megabytes(resource.scalar().value());
+ }
+ }
+
+ return bytes;
+}
+
+
+Try<Isolator*> XfsDiskIsolatorProcess::create(const Flags& flags)
+{
+ if (!xfs::pathIsXfs(flags.work_dir)) {
+ return Error("'" + flags.work_dir + "' is not an XFS filesystem");
+ }
+
+ Result<uid_t> uid = os::getuid();
+ CHECK_SOME(uid) << "getuid(2) doesn't fail";
+
+ if (uid.get() != 0) {
+ return Error("The XFS disk isolator requires running as root.");
+ }
+
+ Try<Resource> projects =
+ Resources::parse("projects", flags.xfs_project_range, "*");
+
+ if (projects.isError()) {
+ return Error(
+ "Failed to parse XFS project range '" +
+ flags.xfs_project_range +
+ "'");
+ }
+
+ if (projects.get().type() != Value::RANGES) {
+ return Error(
+ "Invalid XFS project resource type " +
+ mesos::Value_Type_Name(projects.get().type()) +
+ ", expecting " +
+ mesos::Value_Type_Name(Value::RANGES));
+ }
+
+ Try<IntervalSet<prid_t>> totalProjectIds =
+ getIntervalSet(projects.get().ranges());
+
+ if (totalProjectIds.isError()) {
+ return Error(totalProjectIds.error());
+ }
+
+ Option<Error> status = xfs::validateProjectIds(totalProjectIds.get());
+ if (status.isSome()) {
+ return Error(status->message);
+ }
+
+ return new MesosIsolator(Owned<MesosIsolatorProcess>(
+ new XfsDiskIsolatorProcess(flags, totalProjectIds.get())));
+}
+
+
+XfsDiskIsolatorProcess::XfsDiskIsolatorProcess(
+ const Flags& _flags,
+ const IntervalSet<prid_t>& projectIds)
+ : flags(_flags),
+ totalProjectIds(projectIds),
+ freeProjectIds(projectIds)
+{
+ // At the beginning, the free project range is the same as the
+ // configured project range.
+
+ LOG(INFO) << "Allocating XFS project IDs from the range " << totalProjectIds;
+}
+
+
+XfsDiskIsolatorProcess::~XfsDiskIsolatorProcess() {}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::recover(
+ const list<ContainerState>& states,
+ const hashset<ContainerID>& orphans)
+{
+ // We don't need to explicitly deal with orphans since we are primarily
+ // concerned with the on-disk state. We scan all the sandbox directories
+ // for project IDs that we have not recovered and make a best effort to
+ // remove all the corresponding on-disk state.
+ Try<std::list<std::string>> sandboxes = os::glob(path::join(
+ paths::getSandboxRootDir(flags.work_dir),
+ "*",
+ "frameworks",
+ "*",
+ "executors",
+ "*",
+ "runs",
+ "*"));
+
+ if (sandboxes.isError()) {
+ return Failure("Failed to scan sandbox directories: " + sandboxes.error());
+ }
+
+ hashset<ContainerID> alive;
+
+ foreach (const ContainerState& state, states) {
+ alive.insert(state.container_id());
+ }
+
+ foreach (const string& sandbox, sandboxes.get()) {
+ // Skip the "latest" symlink.
+ if (os::stat::islink(sandbox)) {
+ continue;
+ }
+
+ ContainerID containerId;
+ containerId.set_value(Path(sandbox).basename());
+
+ CHECK(!infos.contains(containerId)) << "ContainerIDs should never collide";
+
+ // We fail the isolator recovery upon failure in any container because
+ // failing to get the project ID usually suggests some fatal issue on the
+ // host.
+ Result<prid_t> projectId = xfs::getProjectId(sandbox);
+ if (projectId.isError()) {
+ return Failure(projectId.error());
+ }
+
+ // If there is no project ID, don't worry about it. This can happen the
+ // first time an operator enables the XFS disk isolator and we recover a
+ // set of containers that we did not isolate.
+ if (projectId.isNone()) {
+ continue;
+ }
+
+ infos.put(containerId, Owned<Info>(new Info(sandbox, projectId.get())));
+ freeProjectIds -= projectId.get();
+
+ // If this is a known orphan, the containerizer will send a cleanup call
+ // later. If this is a live container, we will manage it. Otherwise, we have
+ // to dispatch a cleanup ourselves. Note that we don't wait for the result
+ // of the cleanups as we don't want to block agent recovery for unknown
+ // orphans.
+ if (!orphans.contains(containerId) && !alive.contains(containerId)) {
+ dispatch(self(), &XfsDiskIsolatorProcess::cleanup, containerId);
+ }
+ }
+
+ return Nothing();
+}
+
+
+// We want to assign the project ID as early as possible. XFS will automatically
+// inherit the project ID to new inodes, so if we do this early we save the work
+// of manually assigning the ID to a lot of files.
+Future<Option<ContainerLaunchInfo>> XfsDiskIsolatorProcess::prepare(
+ const ContainerID& containerId,
+ const ContainerConfig& containerConfig)
+{
+ if (infos.contains(containerId)) {
+ return Failure("Container has already been prepared");
+ }
+
+ Option<prid_t> projectId = nextProjectId();
+ if (projectId.isNone()) {
+ return Failure("Failed to assign project ID, range exhausted");
+ }
+
+ // Keep a record of this container so that cleanup() can remove it if
+ // we fail to assign the project ID.
+ infos.put(
+ containerId,
+ Owned<Info>(new Info(containerConfig.directory(), projectId.get())));
+
+ Try<Nothing> status = xfs::setProjectId(
+ containerConfig.directory(), projectId.get());
+
+ if (status.isError()) {
+ return Failure(
+ "Failed to assign project " + stringify(projectId.get()) + ": " +
+ status.error());
+ }
+
+ LOG(INFO) << "Assigned project " << stringify(projectId.get()) << " to '"
+ << containerConfig.directory() << "'";
+
+ return update(containerId, containerConfig.executor_info().resources())
+ .then([]() -> Future<Option<ContainerLaunchInfo>> {
+ return None();
+ });
+}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::isolate(
+ const ContainerID& containerId,
+ pid_t pid)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ return Nothing();
+}
+
+
+Future<ContainerLimitation> XfsDiskIsolatorProcess::watch(
+ const ContainerID& containerId)
+{
+ // We have nothing to do here, since the XFS quota is enforcing
+ // the limitation.
+ return Future<ContainerLimitation>();
+}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ CHECK(infos.contains(containerId));
+
+ const Owned<Info>& info = infos[containerId];
+
+ Option<Bytes> needed = getDiskResource(resources);
+ if (needed.isNone()) {
+ // TODO(jpeach) If there's no disk resource attached, we should set the
+ // minimum quota (1 block), since a zero quota would be unconstrained.
+ LOG(WARNING) << "Ignoring quota update with no disk resources";
+ return Nothing();
+ }
+
+ // Only update the disk quota if it has changed.
+ if (needed.get() != info->quota) {
+ Try<Nothing> status =
+ xfs::setProjectQuota(info->directory, info->projectId, needed.get());
+
+ if (status.isError()) {
+ return Failure("Failed to update quota for project " +
+ stringify(info->projectId) + ": " + status.error());
+ }
+
+ info->quota = needed.get();
+
+ LOG(INFO) << "Set quota on container " << containerId
+ << " for project " << info->projectId
+ << " to " << info->quota;
+ }
+
+ return Nothing();
+}
+
+
+Future<ResourceStatistics> XfsDiskIsolatorProcess::usage(
+ const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ ResourceStatistics statistics;
+ const Owned<Info>& info = infos[containerId];
+
+ Result<xfs::QuotaInfo> quota = xfs::getProjectQuota(
+ info->directory, info->projectId);
+
+ if (quota.isError()) {
+ return Failure(quota.error());
+ }
+
+ if (quota.isSome()) {
+ statistics.set_disk_limit_bytes(quota.get().limit.bytes());
+ statistics.set_disk_used_bytes(quota.get().used.bytes());
+ }
+
+ return statistics;
+}
+
+
+// Remove all the quota state that was created for this container. We
+// make a best effort to remove all the state we can, so we keep going
+// even if one operation fails so that we can remove subsequent state.
+Future<Nothing> XfsDiskIsolatorProcess::cleanup(const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ LOG(INFO) << "Ignoring cleanup for unknown container " << containerId;
+ return Nothing();
+ }
+
+ // Take a copy of the Info we are removing so that we can use it
+ // to construct the Failure message if necessary.
+ const Info info = *infos[containerId];
+
+ infos.erase(containerId);
+
+ LOG(INFO) << "Removing project ID " << info.projectId
+ << " from '" << info.directory << "'";
+
+ Try<Nothing> quotaStatus = xfs::clearProjectQuota(
+ info.directory, info.projectId);
+
+ if (quotaStatus.isError()) {
+ LOG(ERROR) << "Failed to clear quota for '"
+ << info.directory << "': " << quotaStatus.error();
+ }
+
+ Try<Nothing> projectStatus = xfs::clearProjectId(info.directory);
+ if (projectStatus.isError()) {
+ LOG(ERROR) << "Failed to remove project ID "
+ << info.projectId
+ << " from '" << info.directory << "': "
+ << projectStatus.error();
+ }
+
+ // If we failed to remove the on-disk project ID we can't reclaim it
+ // because the quota would then be applied across two containers. This
+ // would be a project ID leak, but we could recover it at GC time if
+ // that was visible to isolators.
+ if (quotaStatus.isError() || projectStatus.isError()) {
+ freeProjectIds -= info.projectId;
+ return Failure("Failed to cleanup '" + info.directory + "'");
+ } else {
+ returnProjectId(info.projectId);
+ return Nothing();
+ }
+}
+
+
+Option<prid_t> XfsDiskIsolatorProcess::nextProjectId()
+{
+ if (freeProjectIds.empty()) {
+ return None();
+ }
+
+ prid_t projectId = freeProjectIds.begin()->lower();
+
+ freeProjectIds -= projectId;
+ return projectId;
+}
+
+void XfsDiskIsolatorProcess::returnProjectId(
+ prid_t projectId)
+{
+ // Only return this project ID to the free range if it is in the total
+ // range. This could happen if the total range is changed by the operator
+ // and we recover a previous container from the old range.
+ if (totalProjectIds.contains(projectId)) {
+ freeProjectIds += projectId;
+ }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/disk.hpp b/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
new file mode 100644
index 0000000..822de65
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __XFS_DISK_ISOLATOR_HPP__
+#define __XFS_DISK_ISOLATOR_HPP__
+
+#include <string>
+
+#include <process/owned.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
+#include "slave/state.hpp"
+
+#include "slave/containerizer/mesos/isolator.hpp"
+
+#include "slave/containerizer/mesos/isolators/xfs/utils.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class XfsDiskIsolatorProcess : public MesosIsolatorProcess
+{
+public:
+ static Try<mesos::slave::Isolator*> create(const Flags& flags);
+
+ virtual ~XfsDiskIsolatorProcess();
+
+ process::PID<XfsDiskIsolatorProcess> self() const
+ {
+ return process::PID<XfsDiskIsolatorProcess>(this);
+ }
+
+ virtual process::Future<Nothing> recover(
+ const std::list<mesos::slave::ContainerState>& states,
+ const hashset<ContainerID>& orphans);
+
+ virtual process::Future<Option<mesos::slave::ContainerLaunchInfo>> prepare(
+ const ContainerID& containerId,
+ const mesos::slave::ContainerConfig& containerConfig);
+
+ virtual process::Future<Nothing> isolate(
+ const ContainerID& containerId,
+ pid_t pid);
+
+ virtual process::Future<mesos::slave::ContainerLimitation> watch(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> cleanup(
+ const ContainerID& containerId);
+
+private:
+ XfsDiskIsolatorProcess(
+ const Flags& flags,
+ const IntervalSet<prid_t>& projectIds);
+
+ // Take the next project ID from the unallocated pool.
+ Option<prid_t> nextProjectId();
+
+ // Return this project ID to the unallocated pool.
+ void returnProjectId(prid_t projectId);
+
+ struct Info
+ {
+ explicit Info(const std::string& _directory, prid_t _projectId)
+ : directory(_directory), quota(0), projectId(_projectId) {}
+
+ const std::string directory;
+ Bytes quota;
+ const prid_t projectId;
+ };
+
+ const Flags flags;
+ const IntervalSet<prid_t> totalProjectIds;
+ IntervalSet<prid_t> freeProjectIds;
+ hashmap<ContainerID, process::Owned<Info>> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __XFS_DISK_ISOLATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/utils.cpp b/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
index 9285183..92914af 100644
--- a/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
+++ b/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
@@ -379,6 +379,12 @@ Option<Error> validateProjectIds(const IntervalSet<prid_t>& projectRange)
return None();
}
+
+bool pathIsXfs(const std::string& path)
+{
+ return ::platform_test_xfs_path(path.c_str()) == 1;
+}
+
} // namespace xfs {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/utils.hpp b/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
index 654dc73..7602fe3 100644
--- a/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
+++ b/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
@@ -46,6 +46,9 @@ inline bool operator==(const QuotaInfo& left, const QuotaInfo& right)
Option<Error> validateProjectIds(const IntervalSet<prid_t>& projectRange);
+bool pathIsXfs(const std::string& path);
+
+
Result<QuotaInfo> getProjectQuota(
const std::string& path,
prid_t projectId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7164afe..dd7bc9a 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -776,4 +776,11 @@ mesos::internal::slave::Flags::Flags()
"The symbol name of the master detector to use. This symbol\n"
"should exist in a module specified through the --modules flag.\n"
"Cannot be used in conjunction with --master.");
+
+#if ENABLE_XFS_DISK_ISOLATOR
+ add(&Flags::xfs_project_range,
+ "xfs_project_range",
+ "The ranges of XFS project IDs to use for tracking directory quotas",
+ "[5000-10000]");
+#endif
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 4236b7f..300db49 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -144,6 +144,9 @@ public:
Duration qos_correction_interval_min;
Duration oversubscribed_resources_interval;
Option<std::string> master_detector;
+#if ENABLE_XFS_DISK_ISOLATOR
+ std::string xfs_project_range;
+#endif
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/tests/containerizer/xfs_quota_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/xfs_quota_tests.cpp b/src/tests/containerizer/xfs_quota_tests.cpp
index 8b0322b..61ea2e5 100644
--- a/src/tests/containerizer/xfs_quota_tests.cpp
+++ b/src/tests/containerizer/xfs_quota_tests.cpp
@@ -36,8 +36,8 @@
#include "master/master.hpp"
-#include "slave/constants.hpp"
#include "slave/flags.hpp"
+#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
@@ -62,8 +62,11 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::MesosContainerizerProcess;
using mesos::internal::slave::Slave;
+using mesos::master::detector::MasterDetector;
+
namespace mesos {
namespace internal {
namespace tests {
@@ -159,6 +162,7 @@ public:
// We only need an XFS-specific directory for the work directory. We
// don't mind that other flags refer to a different temp directory.
flags.work_dir = mountPoint.get();
+ flags.isolation = "xfs/disk";
return flags;
}
@@ -275,6 +279,10 @@ TEST_F(ROOT_XFS_QuotaTest, ProjectIdErrors)
}
+// Verify that directories are isolated with respect to XFS quotas. We
+// create two trees which have symlinks into each other. If we followed
+// the symlinks when applying the project IDs to the directories, then the
+// quotas would end up being incorrect.
TEST_F(ROOT_XFS_QuotaTest, DirectoryTree)
{
Bytes limit = Megabytes(100);
@@ -332,6 +340,421 @@ TEST_F(ROOT_XFS_QuotaTest, DirectoryTree)
getProjectQuota(rootB, projectB));
}
+
+// Verify that a task that tries to consume more space than it has requested
+// is only allowed to consume exactly the assigned resources. We tell dd
+// to write 2MB but only give it 1MB of resources and (roughly) verify that
+// it exits with a failure (that should be a write error).
+TEST_F(ROOT_XFS_QuotaTest, DiskUsageExceedsQuota)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), CreateSlaveFlags());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ const Offer& offer = offers.get()[0];
+
+ // Create a task which requests 1MB disk, but actually uses more
+ // than 2MB disk.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:1").get(),
+ "dd if=/dev/zero of=file bs=1048576 count=2");
+
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(status1);
+ EXPECT_EQ(task.task_id(), status1.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(task.task_id(), status2.get().task_id());
+ EXPECT_EQ(TASK_FAILED, status2.get().state());
+
+ // Unlike the posix/disk isolator, the reason for task failure
+ // should be that dd got an IO error.
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status2.get().source());
+ EXPECT_EQ("Command exited with status 1", status2.get().message());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// Verify that we can get accurate resource statistics from the XFS
+// disk isolator.
+TEST_F(ROOT_XFS_QuotaTest, ResourceStatistics)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Fetcher fetcher;
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ slave::Flags flags = CreateSlaveFlags();
+
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+ Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ // Create a task that uses 4 of 3MB disk but doesn't fail. We will verify
+ // that the allocated disk is filled.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:3").get(),
+ "dd if=/dev/zero of=file bs=1048576 count=4 || sleep 1000");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId = *(containers.get().begin());
+ Timeout timeout = Timeout::in(Seconds(5));
+
+ while (true) {
+ Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+ EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+ if (usage.get().has_disk_used_bytes()) {
+ // Usage must always be <= the limit.
+ EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+
+ // Usage might not be equal to the limit, but it must hit
+ // and not exceed the limit.
+ if (usage.get().disk_used_bytes() >= usage.get().disk_limit_bytes()) {
+ EXPECT_EQ(
+ usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+ EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_used_bytes()));
+ break;
+ }
+ }
+
+ ASSERT_FALSE(timeout.expired());
+ os::sleep(Milliseconds(1));
+ }
+
+ driver.stop();
+ driver.join();
+}
+
+
+// In this test, the framework is not checkpointed. This ensures that when we
+// stop the slave, the executor is killed and we will need to recover the
+// working directories without getting any checkpointed recovery state.
+TEST_F(ROOT_XFS_QuotaTest, NoCheckpointRecovery)
+{
+ slave::Flags flags = CreateSlaveFlags();
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:1").get(),
+ "dd if=/dev/zero of=file bs=1048576 count=1; sleep 1000");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillOnce(Return());
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<ResourceUsage> usage1 =
+ process::dispatch(slave.get()->pid, &Slave::usage);
+ AWAIT_READY(usage1);
+
+ // We should have 1 executor using resources.
+ ASSERT_EQ(1, usage1.get().executors().size());
+ EXPECT_EQ(Megabytes(1), usage1->executors(0).statistics().disk_limit_bytes());
+ EXPECT_EQ(Megabytes(1), usage1->executors(0).statistics().disk_used_bytes());
+
+ // Restart the slave.
+ slave.get()->terminate();
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Following the example of the filesystem isolator tests, wait
+ // until the containerizer cleans up the orphans. Only after that
+ // should we expect to find the project IDs removed.
+ Future<Nothing> _recover =
+ FUTURE_DISPATCH(_, &MesosContainerizerProcess::___recover);
+ AWAIT_READY(_recover);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Future<ResourceUsage> usage2 =
+ process::dispatch(slave.get()->pid, &Slave::usage);
+ AWAIT_READY(usage2);
+
+ // We should have no executors left because we didn't checkpoint.
+ ASSERT_EQ(0, usage2.get().executors().size());
+
+ Try<std::list<std::string>> sandboxes = os::glob(path::join(
+ slave::paths::getSandboxRootDir(mountPoint.get()),
+ "*",
+ "frameworks",
+ "*",
+ "executors",
+ "*",
+ "runs",
+ "*"));
+
+ ASSERT_SOME(sandboxes);
+
+ // One sandbox and one symlink.
+ ASSERT_EQ(2u, sandboxes->size());
+
+ // Scan the remaining sandboxes and make sure that no projects are assigned.
+ foreach (const string& sandbox, sandboxes.get()) {
+ // Skip the "latest" symlink.
+ if (os::stat::islink(sandbox)) {
+ continue;
+ }
+
+ EXPECT_NONE(xfs::getProjectId(sandbox));
+ }
+
+ driver.stop();
+ driver.join();
+}
+
+
+// In this test, the framework is checkpointed so we expect the executor to
+// persist across the slave restart and to have the same resource usage before
+// and after.
+TEST_F(ROOT_XFS_QuotaTest, CheckpointRecovery)
+{
+ slave::Flags flags = CreateSlaveFlags();
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), CreateSlaveFlags());
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:1").get(),
+ "dd if=/dev/zero of=file bs=1048576 count=1; sleep 1000");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<ResourceUsage> usage1 =
+ process::dispatch(slave.get()->pid, &Slave::usage);
+ AWAIT_READY(usage1);
+
+ // We should have 1 executor using resources.
+ ASSERT_EQ(1, usage1.get().executors().size());
+
+ // Restart the slave.
+ slave.get()->terminate();
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Future<ResourceUsage> usage2 =
+ process::dispatch(slave.get()->pid, &Slave::usage);
+ AWAIT_READY(usage2);
+
+ // We should have still have 1 executor using resources.
+ ASSERT_EQ(1, usage1.get().executors().size());
+
+ Try<std::list<std::string>> sandboxes = os::glob(path::join(
+ slave::paths::getSandboxRootDir(mountPoint.get()),
+ "*",
+ "frameworks",
+ "*",
+ "executors",
+ "*",
+ "runs",
+ "*"));
+
+ ASSERT_SOME(sandboxes);
+
+ // One sandbox and one symlink.
+ ASSERT_EQ(2u, sandboxes->size());
+
+ // Scan the remaining sandboxes. We ought to still have project IDs
+ // assigned to them all.
+ foreach (const string& sandbox, sandboxes.get()) {
+ // Skip the "latest" symlink.
+ if (os::stat::islink(sandbox)) {
+ continue;
+ }
+
+ EXPECT_SOME(xfs::getProjectId(sandbox));
+ }
+
+ driver.stop();
+ driver.join();
+}
+
+
+TEST_F(ROOT_XFS_QuotaTest, IsolatorFlags)
+{
+ slave::Flags flags;
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ // work_dir must be an XFS filesystem.
+ flags = CreateSlaveFlags();
+ flags.work_dir = "/proc";
+ ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+ // 0 is an invalid project ID.
+ flags = CreateSlaveFlags();
+ flags.xfs_project_range = "[0-10]";
+ ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+ // Project IDs are 32 bit.
+ flags = CreateSlaveFlags();
+ flags.xfs_project_range = "[100-1099511627776]";
+ ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+ // Project IDs must be a range.
+ flags = CreateSlaveFlags();
+ flags.xfs_project_range = "foo";
+ ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+ // Project IDs must be a range.
+ flags = CreateSlaveFlags();
+ flags.xfs_project_range = "100";
+ ASSERT_ERROR(StartSlave(detector.get(), flags));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {