You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2016/04/09 01:52:08 UTC

[4/6] mesos git commit: Add XFS disk isolator tests.

Add XFS disk isolator tests.

Review: https://reviews.apache.org/r/44949/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/255710b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/255710b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/255710b7

Branch: refs/heads/master
Commit: 255710b7c95e578c873e1317e3705a55e81b1f61
Parents: 04be1d0
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 8 14:53:56 2016 -0700
Committer: Jiang Yan Xu <ya...@jxu.me>
Committed: Fri Apr 8 16:46:08 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   4 +-
 src/slave/containerizer/mesos/containerizer.cpp |   7 +
 .../containerizer/mesos/isolators/xfs/disk.cpp  | 437 +++++++++++++++++++
 .../containerizer/mesos/isolators/xfs/disk.hpp  | 107 +++++
 .../containerizer/mesos/isolators/xfs/utils.cpp |   6 +
 .../containerizer/mesos/isolators/xfs/utils.hpp |   3 +
 src/slave/flags.cpp                             |   7 +
 src/slave/flags.hpp                             |   3 +
 src/tests/containerizer/xfs_quota_tests.cpp     | 425 +++++++++++++++++-
 9 files changed, 997 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a16c2da..dc8f8e3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -899,7 +899,9 @@ MESOS_LINUX_FILES +=							\
 if ENABLE_XFS_DISK_ISOLATOR
 MESOS_LINUX_FILES +=                                                    \
   slave/containerizer/mesos/isolators/xfs/utils.cpp                     \
-  slave/containerizer/mesos/isolators/xfs/utils.hpp
+  slave/containerizer/mesos/isolators/xfs/utils.hpp                     \
+  slave/containerizer/mesos/isolators/xfs/disk.cpp                      \
+  slave/containerizer/mesos/isolators/xfs/disk.hpp
 endif
 
 MESOS_NETWORK_ISOLATOR_FILES =						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index a5dd223..c25fa92 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -55,6 +55,10 @@
 
 #include "slave/containerizer/mesos/isolators/posix/disk.hpp"
 
+#if ENABLE_XFS_DISK_ISOLATOR
+#include "slave/containerizer/mesos/isolators/xfs/disk.hpp"
+#endif
+
 #ifdef __linux__
 #include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups/mem.hpp"
@@ -215,6 +219,9 @@ Try<MesosContainerizer*> MesosContainerizer::create(
     {"posix/cpu", &PosixCpuIsolatorProcess::create},
     {"posix/mem", &PosixMemIsolatorProcess::create},
     {"posix/disk", &PosixDiskIsolatorProcess::create},
+#if ENABLE_XFS_DISK_ISOLATOR
+    {"xfs/disk", &XfsDiskIsolatorProcess::create},
+#endif
 #ifdef __linux__
     {"cgroups/cpu", &CgroupsCpushareIsolatorProcess::create},
     {"cgroups/mem", &CgroupsMemIsolatorProcess::create},

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/disk.cpp b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
new file mode 100644
index 0000000..2f65f0a
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/containerizer/mesos/isolators/xfs/disk.hpp"
+
+#include <glog/logging.h>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+
+#include <stout/os/stat.hpp>
+
+#include "slave/paths.hpp"
+
+using std::list;
+using std::string;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::PID;
+using process::Process;
+using process::Promise;
+
+using mesos::slave::ContainerConfig;
+using mesos::slave::ContainerLaunchInfo;
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+static Try<IntervalSet<prid_t>> getIntervalSet(
+    const Value::Ranges& ranges)
+{
+  IntervalSet<prid_t> set;
+
+  for (int i = 0; i < ranges.range_size(); i++) {
+    if (ranges.range(i).end() > std::numeric_limits<prid_t>::max()) {
+      return Error("Project ID " + stringify(ranges.range(i).end()) +
+                   "  is out of range");
+    }
+
+    set += (Bound<prid_t>::closed(ranges.range(i).begin()),
+            Bound<prid_t>::closed(ranges.range(i).end()));
+  }
+
+  return set;
+}
+
+
+static Option<Bytes> getDiskResource(
+    const Resources& resources)
+{
+  Option<Bytes> bytes = None();
+
+  foreach (const Resource& resource, resources) {
+    if (resource.name() != "disk") {
+      continue;
+    }
+
+    // TODO(jpeach): Ignore persistent volume resources. The problem here is
+    // that we need to guarantee that we can track the removal of every
+    // directory for which we assign a project ID. Since destruction of
+    // persistent is not visible to the isolator, we don't want to risk
+    // leaking the project ID, or spuriously reusing it.
+    if (Resources::isPersistentVolume(resource)) {
+      continue;
+    }
+
+    if (resource.has_disk() && resource.disk().has_volume()) {
+      continue;
+    }
+
+    if (bytes.isSome()) {
+      bytes.get() += Megabytes(resource.scalar().value());
+    } else {
+      bytes = Megabytes(resource.scalar().value());
+    }
+  }
+
+  return bytes;
+}
+
+
+Try<Isolator*> XfsDiskIsolatorProcess::create(const Flags& flags)
+{
+  if (!xfs::pathIsXfs(flags.work_dir)) {
+    return Error("'" + flags.work_dir + "' is not an XFS filesystem");
+  }
+
+  Result<uid_t> uid = os::getuid();
+  CHECK_SOME(uid) << "getuid(2) doesn't fail";
+
+  if (uid.get() != 0) {
+    return Error("The XFS disk isolator requires running as root.");
+  }
+
+  Try<Resource> projects =
+    Resources::parse("projects", flags.xfs_project_range, "*");
+
+  if (projects.isError()) {
+    return Error(
+        "Failed to parse XFS project range '" +
+        flags.xfs_project_range +
+        "'");
+  }
+
+  if (projects.get().type() != Value::RANGES) {
+    return Error(
+        "Invalid XFS project resource type " +
+        mesos::Value_Type_Name(projects.get().type()) +
+        ", expecting " +
+        mesos::Value_Type_Name(Value::RANGES));
+  }
+
+  Try<IntervalSet<prid_t>> totalProjectIds =
+    getIntervalSet(projects.get().ranges());
+
+  if (totalProjectIds.isError()) {
+    return Error(totalProjectIds.error());
+  }
+
+  Option<Error> status = xfs::validateProjectIds(totalProjectIds.get());
+  if (status.isSome()) {
+    return Error(status->message);
+  }
+
+  return new MesosIsolator(Owned<MesosIsolatorProcess>(
+      new XfsDiskIsolatorProcess(flags, totalProjectIds.get())));
+}
+
+
+XfsDiskIsolatorProcess::XfsDiskIsolatorProcess(
+    const Flags& _flags,
+    const IntervalSet<prid_t>& projectIds)
+  : flags(_flags),
+    totalProjectIds(projectIds),
+    freeProjectIds(projectIds)
+{
+  // At the beginning, the free project range is the same as the
+  // configured project range.
+
+  LOG(INFO) << "Allocating XFS project IDs from the range " << totalProjectIds;
+}
+
+
+XfsDiskIsolatorProcess::~XfsDiskIsolatorProcess() {}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::recover(
+    const list<ContainerState>& states,
+    const hashset<ContainerID>& orphans)
+{
+  // We don't need to explicitly deal with orphans since we are primarily
+  // concerned with the on-disk state. We scan all the sandbox directories
+  // for project IDs that we have not recovered and make a best effort to
+  // remove all the corresponding on-disk state.
+  Try<std::list<std::string>> sandboxes = os::glob(path::join(
+      paths::getSandboxRootDir(flags.work_dir),
+      "*",
+      "frameworks",
+      "*",
+      "executors",
+      "*",
+      "runs",
+      "*"));
+
+  if (sandboxes.isError()) {
+    return Failure("Failed to scan sandbox directories: " + sandboxes.error());
+  }
+
+  hashset<ContainerID> alive;
+
+  foreach (const ContainerState& state, states) {
+    alive.insert(state.container_id());
+  }
+
+  foreach (const string& sandbox, sandboxes.get()) {
+    // Skip the "latest" symlink.
+    if (os::stat::islink(sandbox)) {
+      continue;
+    }
+
+    ContainerID containerId;
+    containerId.set_value(Path(sandbox).basename());
+
+    CHECK(!infos.contains(containerId)) << "ContainerIDs should never collide";
+
+    // We fail the isolator recovery upon failure in any container because
+    // failing to get the project ID usually suggests some fatal issue on the
+    // host.
+    Result<prid_t> projectId = xfs::getProjectId(sandbox);
+    if (projectId.isError()) {
+      return Failure(projectId.error());
+    }
+
+    // If there is no project ID, don't worry about it. This can happen the
+    // first time an operator enables the XFS disk isolator and we recover a
+    // set of containers that we did not isolate.
+    if (projectId.isNone()) {
+      continue;
+    }
+
+    infos.put(containerId, Owned<Info>(new Info(sandbox, projectId.get())));
+    freeProjectIds -= projectId.get();
+
+    // If this is a known orphan, the containerizer will send a cleanup call
+    // later. If this is a live container, we will manage it. Otherwise, we have
+    // to dispatch a cleanup ourselves.  Note that we don't wait for the result
+    // of the cleanups as we don't want to block agent recovery for unknown
+    // orphans.
+    if (!orphans.contains(containerId) && !alive.contains(containerId)) {
+      dispatch(self(), &XfsDiskIsolatorProcess::cleanup, containerId);
+    }
+  }
+
+  return Nothing();
+}
+
+
+// We want to assign the project ID as early as possible. XFS will automatically
+// inherit the project ID to new inodes, so if we do this early we save the work
+// of manually assigning the ID to a lot of files.
+Future<Option<ContainerLaunchInfo>> XfsDiskIsolatorProcess::prepare(
+    const ContainerID& containerId,
+    const ContainerConfig& containerConfig)
+{
+  if (infos.contains(containerId)) {
+    return Failure("Container has already been prepared");
+  }
+
+  Option<prid_t> projectId = nextProjectId();
+  if (projectId.isNone()) {
+    return Failure("Failed to assign project ID, range exhausted");
+  }
+
+  // Keep a record of this container so that cleanup() can remove it if
+  // we fail to assign the project ID.
+  infos.put(
+      containerId,
+      Owned<Info>(new Info(containerConfig.directory(), projectId.get())));
+
+  Try<Nothing> status = xfs::setProjectId(
+      containerConfig.directory(), projectId.get());
+
+  if (status.isError()) {
+    return Failure(
+        "Failed to assign project " + stringify(projectId.get()) + ": " +
+        status.error());
+  }
+
+  LOG(INFO) << "Assigned project " << stringify(projectId.get()) << " to '"
+            << containerConfig.directory() << "'";
+
+  return update(containerId, containerConfig.executor_info().resources())
+    .then([]() -> Future<Option<ContainerLaunchInfo>> {
+      return None();
+    });
+}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::isolate(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  return Nothing();
+}
+
+
+Future<ContainerLimitation> XfsDiskIsolatorProcess::watch(
+    const ContainerID& containerId)
+{
+  // We have nothing to do here, since the XFS quota is enforcing
+  // the limitation.
+  return Future<ContainerLimitation>();
+}
+
+
+Future<Nothing> XfsDiskIsolatorProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  CHECK(infos.contains(containerId));
+
+  const Owned<Info>& info = infos[containerId];
+
+  Option<Bytes> needed = getDiskResource(resources);
+  if (needed.isNone()) {
+    // TODO(jpeach) If there's no disk resource attached, we should set the
+    // minimum quota (1 block), since a zero quota would be unconstrained.
+    LOG(WARNING) << "Ignoring quota update with no disk resources";
+    return Nothing();
+  }
+
+  // Only update the disk quota if it has changed.
+  if (needed.get() != info->quota) {
+    Try<Nothing> status =
+      xfs::setProjectQuota(info->directory, info->projectId, needed.get());
+
+    if (status.isError()) {
+      return Failure("Failed to update quota for project " +
+                     stringify(info->projectId) + ": " + status.error());
+    }
+
+    info->quota = needed.get();
+
+    LOG(INFO) << "Set quota on container " << containerId
+              << " for project " << info->projectId
+              << " to " << info->quota;
+  }
+
+  return Nothing();
+}
+
+
+Future<ResourceStatistics> XfsDiskIsolatorProcess::usage(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  ResourceStatistics statistics;
+  const Owned<Info>& info = infos[containerId];
+
+  Result<xfs::QuotaInfo> quota = xfs::getProjectQuota(
+      info->directory, info->projectId);
+
+  if (quota.isError()) {
+    return Failure(quota.error());
+  }
+
+  if (quota.isSome()) {
+    statistics.set_disk_limit_bytes(quota.get().limit.bytes());
+    statistics.set_disk_used_bytes(quota.get().used.bytes());
+  }
+
+  return statistics;
+}
+
+
+// Remove all the quota state that was created for this container. We
+// make a best effort to remove all the state we can, so we keep going
+// even if one operation fails so that we can remove subsequent state.
+Future<Nothing> XfsDiskIsolatorProcess::cleanup(const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    LOG(INFO) << "Ignoring cleanup for unknown container " << containerId;
+    return Nothing();
+  }
+
+  // Take a copy of the Info we are removing so that we can use it
+  // to construct the Failure message if necessary.
+  const Info info = *infos[containerId];
+
+  infos.erase(containerId);
+
+  LOG(INFO) << "Removing project ID " << info.projectId
+            << " from '" << info.directory << "'";
+
+  Try<Nothing> quotaStatus = xfs::clearProjectQuota(
+      info.directory, info.projectId);
+
+  if (quotaStatus.isError()) {
+    LOG(ERROR) << "Failed to clear quota for '"
+               << info.directory << "': " << quotaStatus.error();
+  }
+
+  Try<Nothing> projectStatus = xfs::clearProjectId(info.directory);
+  if (projectStatus.isError()) {
+    LOG(ERROR) << "Failed to remove project ID "
+               << info.projectId
+               << " from '" << info.directory << "': "
+               << projectStatus.error();
+  }
+
+  // If we failed to remove the on-disk project ID we can't reclaim it
+  // because the quota would then be applied across two containers. This
+  // would be a project ID leak, but we could recover it at GC time if
+  // that was visible to isolators.
+  if (quotaStatus.isError() || projectStatus.isError()) {
+    freeProjectIds -= info.projectId;
+    return Failure("Failed to cleanup '" + info.directory + "'");
+  } else {
+    returnProjectId(info.projectId);
+    return Nothing();
+  }
+}
+
+
+Option<prid_t> XfsDiskIsolatorProcess::nextProjectId()
+{
+  if (freeProjectIds.empty()) {
+    return None();
+  }
+
+  prid_t projectId = freeProjectIds.begin()->lower();
+
+  freeProjectIds -= projectId;
+  return projectId;
+}
+
+void XfsDiskIsolatorProcess::returnProjectId(
+    prid_t projectId)
+{
+  // Only return this project ID to the free range if it is in the total
+  // range. This could happen if the total range is changed by the operator
+  // and we recover a previous container from the old range.
+  if (totalProjectIds.contains(projectId)) {
+    freeProjectIds += projectId;
+  }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/disk.hpp b/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
new file mode 100644
index 0000000..822de65
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/xfs/disk.hpp
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __XFS_DISK_ISOLATOR_HPP__
+#define __XFS_DISK_ISOLATOR_HPP__
+
+#include <string>
+
+#include <process/owned.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
+#include "slave/state.hpp"
+
+#include "slave/containerizer/mesos/isolator.hpp"
+
+#include "slave/containerizer/mesos/isolators/xfs/utils.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class XfsDiskIsolatorProcess : public MesosIsolatorProcess
+{
+public:
+  static Try<mesos::slave::Isolator*> create(const Flags& flags);
+
+  virtual ~XfsDiskIsolatorProcess();
+
+  process::PID<XfsDiskIsolatorProcess> self() const
+  {
+    return process::PID<XfsDiskIsolatorProcess>(this);
+  }
+
+  virtual process::Future<Nothing> recover(
+      const std::list<mesos::slave::ContainerState>& states,
+      const hashset<ContainerID>& orphans);
+
+  virtual process::Future<Option<mesos::slave::ContainerLaunchInfo>> prepare(
+      const ContainerID& containerId,
+      const mesos::slave::ContainerConfig& containerConfig);
+
+  virtual process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> cleanup(
+      const ContainerID& containerId);
+
+private:
+  XfsDiskIsolatorProcess(
+      const Flags& flags,
+      const IntervalSet<prid_t>& projectIds);
+
+  // Take the next project ID from the unallocated pool.
+  Option<prid_t> nextProjectId();
+
+  // Return this project ID to the unallocated pool.
+  void returnProjectId(prid_t projectId);
+
+  struct Info
+  {
+    explicit Info(const std::string& _directory, prid_t _projectId)
+      : directory(_directory), quota(0),  projectId(_projectId) {}
+
+    const std::string directory;
+    Bytes quota;
+    const prid_t projectId;
+  };
+
+  const Flags flags;
+  const IntervalSet<prid_t> totalProjectIds;
+  IntervalSet<prid_t> freeProjectIds;
+  hashmap<ContainerID, process::Owned<Info>> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __XFS_DISK_ISOLATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/utils.cpp b/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
index 9285183..92914af 100644
--- a/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
+++ b/src/slave/containerizer/mesos/isolators/xfs/utils.cpp
@@ -379,6 +379,12 @@ Option<Error> validateProjectIds(const IntervalSet<prid_t>& projectRange)
   return None();
 }
 
+
+bool pathIsXfs(const std::string& path)
+{
+  return ::platform_test_xfs_path(path.c_str()) == 1;
+}
+
 } // namespace xfs {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/xfs/utils.hpp b/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
index 654dc73..7602fe3 100644
--- a/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
+++ b/src/slave/containerizer/mesos/isolators/xfs/utils.hpp
@@ -46,6 +46,9 @@ inline bool operator==(const QuotaInfo& left, const QuotaInfo& right)
 Option<Error> validateProjectIds(const IntervalSet<prid_t>& projectRange);
 
 
+bool pathIsXfs(const std::string& path);
+
+
 Result<QuotaInfo> getProjectQuota(
     const std::string& path,
     prid_t projectId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7164afe..dd7bc9a 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -776,4 +776,11 @@ mesos::internal::slave::Flags::Flags()
       "The symbol name of the master detector to use. This symbol\n"
       "should exist in a module specified through the --modules flag.\n"
       "Cannot be used in conjunction with --master.");
+
+#if ENABLE_XFS_DISK_ISOLATOR
+  add(&Flags::xfs_project_range,
+      "xfs_project_range",
+      "The ranges of XFS project IDs to use for tracking directory quotas",
+      "[5000-10000]");
+#endif
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 4236b7f..300db49 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -144,6 +144,9 @@ public:
   Duration qos_correction_interval_min;
   Duration oversubscribed_resources_interval;
   Option<std::string> master_detector;
+#if ENABLE_XFS_DISK_ISOLATOR
+  std::string xfs_project_range;
+#endif
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/255710b7/src/tests/containerizer/xfs_quota_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/xfs_quota_tests.cpp b/src/tests/containerizer/xfs_quota_tests.cpp
index 8b0322b..61ea2e5 100644
--- a/src/tests/containerizer/xfs_quota_tests.cpp
+++ b/src/tests/containerizer/xfs_quota_tests.cpp
@@ -36,8 +36,8 @@
 
 #include "master/master.hpp"
 
-#include "slave/constants.hpp"
 #include "slave/flags.hpp"
+#include "slave/paths.hpp"
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
@@ -62,8 +62,11 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::MesosContainerizerProcess;
 using mesos::internal::slave::Slave;
 
+using mesos::master::detector::MasterDetector;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -159,6 +162,7 @@ public:
     // We only need an XFS-specific directory for the work directory. We
     // don't mind that other flags refer to a different temp directory.
     flags.work_dir = mountPoint.get();
+    flags.isolation = "xfs/disk";
     return flags;
   }
 
@@ -275,6 +279,10 @@ TEST_F(ROOT_XFS_QuotaTest, ProjectIdErrors)
 }
 
 
+// Verify that directories are isolated with respect to XFS quotas. We
+// create two trees which have symlinks into each other. If we followed
+// the symlinks when applying the project IDs to the directories, then the
+// quotas would end up being incorrect.
 TEST_F(ROOT_XFS_QuotaTest, DirectoryTree)
 {
   Bytes limit = Megabytes(100);
@@ -332,6 +340,421 @@ TEST_F(ROOT_XFS_QuotaTest, DirectoryTree)
       getProjectQuota(rootB, projectB));
 }
 
+
+// Verify that a task that tries to consume more space than it has requested
+// is only allowed to consume exactly the assigned resources. We tell dd
+// to write 2MB but only give it 1MB of resources and (roughly) verify that
+// it exits with a failure (that should be a write error).
+TEST_F(ROOT_XFS_QuotaTest, DiskUsageExceedsQuota)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), CreateSlaveFlags());
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  const Offer& offer = offers.get()[0];
+
+  // Create a task which requests 1MB disk, but actually uses more
+  // than 2MB disk.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128;disk:1").get(),
+      "dd if=/dev/zero of=file bs=1048576 count=2");
+
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(task.task_id(), status1.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(task.task_id(), status2.get().task_id());
+  EXPECT_EQ(TASK_FAILED, status2.get().state());
+
+  // Unlike the posix/disk isolator, the reason for task failure
+  // should be that dd got an IO error.
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status2.get().source());
+  EXPECT_EQ("Command exited with status 1", status2.get().message());
+
+  driver.stop();
+  driver.join();
+}
+
+
+// Verify that we can get accurate resource statistics from the XFS
+// disk isolator.
+TEST_F(ROOT_XFS_QuotaTest, ResourceStatistics)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Fetcher fetcher;
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  slave::Flags flags = CreateSlaveFlags();
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+  Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // Create a task that uses 4 of 3MB disk but doesn't fail. We will verify
+  // that the allocated disk is filled.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128;disk:3").get(),
+      "dd if=/dev/zero of=file bs=1048576 count=4 || sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId = *(containers.get().begin());
+  Timeout timeout = Timeout::in(Seconds(5));
+
+  while (true) {
+    Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+    AWAIT_READY(usage);
+
+    ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+    EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+    if (usage.get().has_disk_used_bytes()) {
+      // Usage must always be <= the limit.
+      EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+
+      // Usage might not be equal to the limit, but it must hit
+      // and not exceed the limit.
+      if (usage.get().disk_used_bytes() >= usage.get().disk_limit_bytes()) {
+        EXPECT_EQ(
+            usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+        EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_used_bytes()));
+        break;
+      }
+    }
+
+    ASSERT_FALSE(timeout.expired());
+    os::sleep(Milliseconds(1));
+  }
+
+  driver.stop();
+  driver.join();
+}
+
+
+// In this test, the framework is not checkpointed. This ensures that when we
+// stop the slave, the executor is killed and we will need to recover the
+// working directories without getting any checkpointed recovery state.
+TEST_F(ROOT_XFS_QuotaTest, NoCheckpointRecovery)
+{
+  slave::Flags flags = CreateSlaveFlags();
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128;disk:1").get(),
+      "dd if=/dev/zero of=file bs=1048576 count=1; sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillOnce(Return());
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<ResourceUsage> usage1 =
+    process::dispatch(slave.get()->pid, &Slave::usage);
+  AWAIT_READY(usage1);
+
+  // We should have 1 executor using resources.
+  ASSERT_EQ(1, usage1.get().executors().size());
+  EXPECT_EQ(Megabytes(1), usage1->executors(0).statistics().disk_limit_bytes());
+  EXPECT_EQ(Megabytes(1), usage1->executors(0).statistics().disk_used_bytes());
+
+  // Restart the slave.
+  slave.get()->terminate();
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Following the example of the filesystem isolator tests, wait
+  // until the containerizer cleans up the orphans. Only after that
+  // should we expect to find the project IDs removed.
+  Future<Nothing> _recover =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::___recover);
+  AWAIT_READY(_recover);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Future<ResourceUsage> usage2 =
+    process::dispatch(slave.get()->pid, &Slave::usage);
+  AWAIT_READY(usage2);
+
+  // We should have no executors left because we didn't checkpoint.
+  ASSERT_EQ(0, usage2.get().executors().size());
+
+  Try<std::list<std::string>> sandboxes = os::glob(path::join(
+      slave::paths::getSandboxRootDir(mountPoint.get()),
+      "*",
+      "frameworks",
+      "*",
+      "executors",
+      "*",
+      "runs",
+      "*"));
+
+  ASSERT_SOME(sandboxes);
+
+  // One sandbox and one symlink.
+  ASSERT_EQ(2u, sandboxes->size());
+
+  // Scan the remaining sandboxes and make sure that no projects are assigned.
+  foreach (const string& sandbox, sandboxes.get()) {
+    // Skip the "latest" symlink.
+    if (os::stat::islink(sandbox)) {
+      continue;
+    }
+
+    EXPECT_NONE(xfs::getProjectId(sandbox));
+  }
+
+  driver.stop();
+  driver.join();
+}
+
+
+// In this test, the framework is checkpointed so we expect the executor to
+// persist across the slave restart and to have the same resource usage before
+// and after.
+TEST_F(ROOT_XFS_QuotaTest, CheckpointRecovery)
+{
+  slave::Flags flags = CreateSlaveFlags();
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), CreateSlaveFlags());
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128;disk:1").get(),
+      "dd if=/dev/zero of=file bs=1048576 count=1; sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<ResourceUsage> usage1 =
+    process::dispatch(slave.get()->pid, &Slave::usage);
+  AWAIT_READY(usage1);
+
+  // We should have 1 executor using resources.
+  ASSERT_EQ(1, usage1.get().executors().size());
+
+  // Restart the slave.
+  slave.get()->terminate();
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Future<ResourceUsage> usage2 =
+    process::dispatch(slave.get()->pid, &Slave::usage);
+  AWAIT_READY(usage2);
+
+  // We should have still have 1 executor using resources.
+  ASSERT_EQ(1, usage1.get().executors().size());
+
+  Try<std::list<std::string>> sandboxes = os::glob(path::join(
+      slave::paths::getSandboxRootDir(mountPoint.get()),
+      "*",
+      "frameworks",
+      "*",
+      "executors",
+      "*",
+      "runs",
+      "*"));
+
+  ASSERT_SOME(sandboxes);
+
+  // One sandbox and one symlink.
+  ASSERT_EQ(2u, sandboxes->size());
+
+  // Scan the remaining sandboxes. We ought to still have project IDs
+  // assigned to them all.
+  foreach (const string& sandbox, sandboxes.get()) {
+    // Skip the "latest" symlink.
+    if (os::stat::islink(sandbox)) {
+      continue;
+    }
+
+    EXPECT_SOME(xfs::getProjectId(sandbox));
+  }
+
+  driver.stop();
+  driver.join();
+}
+
+
+TEST_F(ROOT_XFS_QuotaTest, IsolatorFlags)
+{
+  slave::Flags flags;
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  // work_dir must be an XFS filesystem.
+  flags = CreateSlaveFlags();
+  flags.work_dir = "/proc";
+  ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+  // 0 is an invalid project ID.
+  flags = CreateSlaveFlags();
+  flags.xfs_project_range = "[0-10]";
+  ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+  // Project IDs are 32 bit.
+  flags = CreateSlaveFlags();
+  flags.xfs_project_range = "[100-1099511627776]";
+  ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+  // Project IDs must be a range.
+  flags = CreateSlaveFlags();
+  flags.xfs_project_range = "foo";
+  ASSERT_ERROR(StartSlave(detector.get(), flags));
+
+  // Project IDs must be a range.
+  flags = CreateSlaveFlags();
+  flags.xfs_project_range = "100";
+  ASSERT_ERROR(StartSlave(detector.get(), flags));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {