You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2019/08/08 04:54:53 UTC
[mesos] 05/06: Updated the `disk/du` isolator to support rootfs
checks.
This is an automated email from the ASF dual-hosted git repository.
jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8ba0682521c6051b42f33b3dd96a37f4d46a290d
Author: James Peach <jp...@apache.org>
AuthorDate: Wed Aug 7 20:22:21 2019 -0700
Updated the `disk/du` isolator to support rootfs checks.
Updated the `disk/du` isolator to support isolating rootfs ephemeral
volumes. We need to keep track of the ephemeral paths and start a `du`
check for each one of them, but otherwise treat them in the same way
as the sandbox path.
Review: https://reviews.apache.org/r/71196/
---
.../containerizer/mesos/isolators/posix/disk.cpp | 144 ++++++++++++++-------
.../containerizer/mesos/isolators/posix/disk.hpp | 15 ++-
2 files changed, 108 insertions(+), 51 deletions(-)
diff --git a/src/slave/containerizer/mesos/isolators/posix/disk.cpp b/src/slave/containerizer/mesos/isolators/posix/disk.cpp
index 4869c72..29bdbe6 100644
--- a/src/slave/containerizer/mesos/isolators/posix/disk.cpp
+++ b/src/slave/containerizer/mesos/isolators/posix/disk.cpp
@@ -135,7 +135,13 @@ Future<Nothing> PosixDiskIsolatorProcess::recover(
CHECK(os::exists(state.directory()))
<< "Executor work directory " << state.directory() << " doesn't exist";
- infos.put(state.container_id(), Owned<Info>(new Info(state.directory())));
+ Owned<Info> info(new Info(state.directory()));
+
+ foreach (const string& path, state.ephemeral_volumes()) {
+ info->directories.insert(path);
+ }
+
+ infos.put(state.container_id(), info);
}
return Nothing();
@@ -157,8 +163,13 @@ Future<Option<ContainerLaunchInfo>> PosixDiskIsolatorProcess::prepare(
return Failure("Container has already been prepared");
}
- infos.put(containerId, Owned<Info>(new Info(containerConfig.directory())));
+ Owned<Info> info(new Info(containerConfig.directory()));
+
+ foreach (const string& path, containerConfig.ephemeral_volumes()) {
+ info->directories.insert(path);
+ }
+ infos.put(containerId, info);
return None();
}
@@ -232,7 +243,7 @@ Future<Nothing> PosixDiskIsolatorProcess::update(
// If either DiskInfo or DiskInfo.Volume are not set we're
// dealing with the working directory of the executor (aka the
// sandbox).
- path = info->directory;
+ path = info->sandbox;
} else {
// Otherwise it is a disk resource (such as a persistent volume)
// and we extract the path from the protobuf.
@@ -242,7 +253,7 @@ Future<Nothing> PosixDiskIsolatorProcess::update(
// is relative to the working directory of the executor. We
// always store the absolute path.
if (!path::absolute(path)) {
- path = path::join(info->directory, path);
+ path = path::join(info->sandbox, path);
}
}
@@ -253,6 +264,16 @@ Future<Nothing> PosixDiskIsolatorProcess::update(
quotas[path] += resource;
}
+ // If we still have a sandbox quota, include all the ephemeral
+ // quota directories so we include them in the collection state
+ // updates below.
+ if (quotas.contains(info->sandbox)) {
+ const Resources quota = quotas[info->sandbox];
+ foreach (const string& path, info->directories) {
+ quotas[path] = quota;
+ }
+ }
+
// Update the quota for paths. For each new path we also initiate
// the disk usage collection.
foreachpair (const string& path, const Resources& quota, quotas) {
@@ -291,9 +312,9 @@ Future<Bytes> PosixDiskIsolatorProcess::collect(
// 'du' process to incorrectly include the disk usage of the newly
// added persistent volume to the usage of the sandbox.
vector<string> excludes;
- if (path == info->directory) {
+ if (info->directories.contains(path)) {
foreachkey (const string& exclude, info->paths) {
- if (exclude != info->directory) {
+ if (!info->directories.contains(exclude)) {
excludes.push_back(exclude);
}
}
@@ -302,7 +323,7 @@ Future<Bytes> PosixDiskIsolatorProcess::collect(
// We append "/" at the end to make sure that 'du' runs on actual
// directory pointed by the symlink (and not the symlink itself).
string _path = path;
- if (path != info->directory && os::stat::islink(path)) {
+ if (!info->directories.contains(path) && os::stat::islink(path)) {
_path = path::join(path, "");
}
@@ -350,30 +371,40 @@ void PosixDiskIsolatorProcess::_collect(
// Save the last disk usage.
info->paths[path].lastUsage = future.get();
- // We need to ignore the quota enforcement check for MOUNT type
- // disk resources because its quota will be enforced by the
- // underlying filesystem.
- bool isDiskSourceMount = false;
- foreach (const Resource& resource, info->paths[path].quota) {
- if (resource.has_disk() &&
- resource.disk().has_source() &&
- resource.disk().source().type() ==
- Resource::DiskInfo::Source::MOUNT) {
- isDiskSourceMount = true;
+ if (flags.enforce_container_disk_quota) {
+ Bytes currentUsage = future.get();
+
+ // We need to ignore the quota enforcement check for MOUNT type
+ // disk resources because its quota will be enforced by the
+ // underlying filesystem.
+ bool isDiskSourceMount = false;
+ foreach (const Resource& resource, info->paths[path].quota) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() ==
+ Resource::DiskInfo::Source::MOUNT) {
+ isDiskSourceMount = true;
+ }
}
- }
- if (flags.enforce_container_disk_quota && !isDiskSourceMount) {
- Option<Bytes> quota = info->paths[path].quota.disk();
- CHECK_SOME(quota);
-
- if (future.get() > quota.get()) {
- info->limitation.set(
- protobuf::slave::createContainerLimitation(
- Resources(info->paths[path].quota),
- "Disk usage (" + stringify(future.get()) +
- ") exceeds quota (" + stringify(quota.get()) + ")",
- TaskStatus::REASON_CONTAINER_LIMITATION_DISK));
+ if (!isDiskSourceMount) {
+ // If this path is using the ephemeral quota, we need to
+ // estimate the total current usage.
+ if (info->directories.contains(path)) {
+ currentUsage = info->ephemeralUsage();
+ }
+
+ Option<Bytes> quota = info->paths[path].quota.disk();
+ CHECK_SOME(quota);
+
+ if (currentUsage > quota.get()) {
+ info->limitation.set(
+ protobuf::slave::createContainerLimitation(
+ Resources(info->paths[path].quota),
+ "Disk usage (" + stringify(currentUsage) +
+ ") exceeds quota (" + stringify(quota.get()) + ")",
+ TaskStatus::REASON_CONTAINER_LIMITATION_DISK));
+ }
}
}
}
@@ -400,42 +431,49 @@ Future<ResourceStatistics> PosixDiskIsolatorProcess::usage(
foreachpair (const string& path,
const Info::PathInfo& pathInfo,
info->paths) {
+ // Skip ephemeral paths. We explicitly deal with those below.
+ if (info->directories.contains(path)) {
+ continue;
+ }
+
DiskStatistics *statistics = result.add_disk_statistics();
Option<Bytes> quota = pathInfo.quota.disk();
CHECK_SOME(quota);
statistics->set_limit_bytes(quota->bytes());
- if (path == info->directory) {
- result.set_disk_limit_bytes(quota->bytes());
- }
// NOTE: There may be a large delay (# of containers * interval)
// until an initial cached value is returned here!
if (pathInfo.lastUsage.isSome()) {
statistics->set_used_bytes(pathInfo.lastUsage->bytes());
- if (path == info->directory) {
- result.set_disk_used_bytes(pathInfo.lastUsage->bytes());
- }
}
- // Set meta information for persistent volumes.
- if (path != info->directory) {
- // TODO(jieyu): For persistent volumes, validate that there is
- // only one Resource object associated with it.
- Resource resource = *pathInfo.quota.begin();
+ // TODO(jieyu): For persistent volumes, validate that there is
+ // only one Resource object associated with it.
+ Resource resource = *pathInfo.quota.begin();
- if (resource.has_disk() && resource.disk().has_source()) {
- statistics->mutable_source()->CopyFrom(resource.disk().source());
- }
+ if (resource.has_disk() && resource.disk().has_source()) {
+ statistics->mutable_source()->CopyFrom(resource.disk().source());
+ }
- if (resource.has_disk() && resource.disk().has_persistence()) {
- statistics->mutable_persistence()->CopyFrom(
- resource.disk().persistence());
- }
+ if (resource.has_disk() && resource.disk().has_persistence()) {
+ statistics->mutable_persistence()->CopyFrom(
+ resource.disk().persistence());
}
}
+ result.set_disk_used_bytes(info->ephemeralUsage().bytes());
+
+ // It doesn't matter which ephemeral path we use to get the quota,
+ // since it's replicated there.
+ result.set_disk_limit_bytes(
+ info->paths[info->sandbox].quota.disk()->bytes());
+
+ DiskStatistics *statistics = result.add_disk_statistics();
+ statistics->set_limit_bytes(result.disk_limit_bytes());
+ statistics->set_used_bytes(result.disk_used_bytes());
+
return result;
}
@@ -460,6 +498,18 @@ Future<Nothing> PosixDiskIsolatorProcess::cleanup(
}
+Bytes PosixDiskIsolatorProcess::Info::ephemeralUsage() const
+{
+ Bytes usage;
+
+ foreach (const string& path, directories) {
+ usage += paths.at(path).lastUsage.getOrElse(0);
+ }
+
+ return usage;
+}
+
+
class DiskUsageCollectorProcess : public Process<DiskUsageCollectorProcess>
{
public:
diff --git a/src/slave/containerizer/mesos/isolators/posix/disk.hpp b/src/slave/containerizer/mesos/isolators/posix/disk.hpp
index f513dfc..2e9b481 100644
--- a/src/slave/containerizer/mesos/isolators/posix/disk.hpp
+++ b/src/slave/containerizer/mesos/isolators/posix/disk.hpp
@@ -24,6 +24,7 @@
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
#include "slave/flags.hpp"
@@ -122,11 +123,17 @@ private:
struct Info
{
- explicit Info(const std::string& _directory) : directory(_directory) {}
+ explicit Info(const std::string& _directory)
+ : directories({_directory}), sandbox(_directory) {}
- // We save executor working directory here so that we know where
- // to collect disk usage for disk resources without DiskInfo.
- const std::string directory;
+ Bytes ephemeralUsage() const;
+
+ // Save the executor ephemeral storage (sandbox and rootfs)
+ // directories so that we know where to collect disk usage
+ // for disk resources without DiskInfo.
+ hashset<std::string> directories;
+
+ std::string sandbox;
process::Promise<mesos::slave::ContainerLimitation> limitation;