You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:33 UTC
[09/12] mesos git commit: Relocated MesosContainerizer specific files
to the correct location.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp
deleted file mode 100644
index ae53c1b..0000000
--- a/src/slave/containerizer/isolators/network/port_mapping.hpp
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __PORT_MAPPING_ISOLATOR_HPP__
-#define __PORT_MAPPING_ISOLATOR_HPP__
-
-#include <stdint.h>
-
-#include <sys/types.h>
-
-#include <set>
-#include <string>
-#include <vector>
-
-#include <process/owned.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/metrics/metrics.hpp>
-#include <process/metrics/counter.hpp>
-
-#include <stout/bytes.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/ip.hpp>
-#include <stout/interval.hpp>
-#include <stout/mac.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
-#include <stout/subcommand.hpp>
-
-#include "linux/routing/filter/ip.hpp"
-
-#include "slave/flags.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// The prefix this isolator uses for the virtual ethernet devices.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_VETH_PREFIX() { return "mesos"; }
-
-
-// The root directory where we bind mount all the namespace handles.
-// We choose the directory '/var/run/netns' so that we can use
-// iproute2 suite (e.g., ip netns show/exec) to inspect or enter the
-// network namespace. This is very useful for debugging purposes.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_BIND_MOUNT_ROOT() { return "/var/run/netns"; }
-
-// The root directory where we keep all the namespace handle
-// symlinks. This is introduced in 0.23.0.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()
-{
- return "/var/run/mesos/netns";
-}
-
-
-// These names are used to identify the traffic control statistics
-// output for each of the Linux Traffic Control Qdiscs we report.
-constexpr char NET_ISOLATOR_BW_LIMIT[] = "bw_limit";
-constexpr char NET_ISOLATOR_BLOAT_REDUCTION[] = "bloat_reduction";
-
-
-// Responsible for allocating ephemeral ports for the port mapping
-// network isolator. This class is exposed mainly for unit testing.
-class EphemeralPortsAllocator
-{
-public:
- EphemeralPortsAllocator(
- const IntervalSet<uint16_t>& total,
- size_t _portsPerContainer)
- : free(total),
- portsPerContainer_(_portsPerContainer) {};
-
- // Returns the number of ephemeral ports for each container.
- size_t portsPerContainer() const { return portsPerContainer_; }
-
- // Allocate an ephemeral port range for a container. The allocator
- // will automatically find one port range with the given container
- // size. Returns error if the allocation cannot be fulfilled (e.g.,
- // exhausting available ephemeral ports).
- Try<Interval<uint16_t>> allocate();
-
- // Mark the specified ephemeral port range as allocated.
- void allocate(const Interval<uint16_t>& ports);
-
- // Deallocate the specified ephemeral port range.
- void deallocate(const Interval<uint16_t>& ports);
-
- // Return true if the specified ephemeral port range is managed by
- // the allocator, regardless it has been allocated to use or not.
- bool isManaged(const Interval<uint16_t>& ports)
- {
- return (free + used).contains(ports);
- }
-
-private:
- // Given an integer x, return the smallest integer t such that t >=
- // x and t % m == 0.
- static uint32_t nextMultipleOf(uint32_t x, uint32_t m);
-
- IntervalSet<uint16_t> free;
- IntervalSet<uint16_t> used;
-
- // The number of ephemeral ports for each container.
- size_t portsPerContainer_;
-};
-
-
-// For the specified ports, generate a set of port ranges each of
-// which can be used by a single IP filter. In other words, each port
-// range needs to satisfy the following two conditions: 1) the size of
-// the range is 2^n (n=0,1,2...); 2) the begin of the range is size
-// aligned (i.e., begin % size == 0). This function is exposed mainly
-// for unit testing.
-std::vector<routing::filter::ip::PortRange> getPortRanges(
- const IntervalSet<uint16_t>& ports);
-
-
-// Provides network isolation using port mapping. Each container is
-// assigned a fixed set of ports (including ephemeral ports). The
-// isolator will set up filters on the host such that network traffic
-// to the host will be properly redirected to the corresponding
-// container depending on the destination ports. The network traffic
-// from containers will also be properly relayed to the host. This
-// isolator is useful when the operator wants to reuse the host IP for
-// all containers running on the host (e.g., there are insufficient
-// IPs).
-class PortMappingIsolatorProcess : public MesosIsolatorProcess
-{
-public:
- static Try<mesos::slave::Isolator*> create(const Flags& flags);
-
- virtual ~PortMappingIsolatorProcess() {}
-
- virtual process::Future<Nothing> recover(
- const std::list<mesos::slave::ContainerState>& states,
- const hashset<ContainerID>& orphans);
-
- virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- virtual process::Future<Nothing> isolate(
- const ContainerID& containerId,
- pid_t pid);
-
- virtual process::Future<mesos::slave::ContainerLimitation> watch(
- const ContainerID& containerId);
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- virtual process::Future<Nothing> cleanup(
- const ContainerID& containerId);
-
-private:
- struct Info
- {
- Info(const IntervalSet<uint16_t>& _nonEphemeralPorts,
- const Interval<uint16_t>& _ephemeralPorts,
- const Option<pid_t>& _pid = None())
- : nonEphemeralPorts(_nonEphemeralPorts),
- ephemeralPorts(_ephemeralPorts),
- pid(_pid) {}
-
- // Non-ephemeral ports used by the container. It's possible that a
- // container does not use any non-ephemeral ports. In that case,
- // 'nonEphemeralPorts' will be empty. This variable could change
- // upon 'update'.
- IntervalSet<uint16_t> nonEphemeralPorts;
-
- // Each container has one and only one range of ephemeral ports.
- // It cannot have more than one ranges of ephemeral ports because
- // we need to setup the ip_local_port_range (which only accepts a
- // single interval) inside the container to restrict the ephemeral
- // ports used by the container.
- const Interval<uint16_t> ephemeralPorts;
-
- Option<pid_t> pid;
- Option<uint16_t> flowId;
- };
-
- // Define the metrics used by the port mapping network isolator.
- struct Metrics
- {
- Metrics();
- ~Metrics();
-
- process::metrics::Counter adding_eth0_ip_filters_errors;
- process::metrics::Counter adding_eth0_ip_filters_already_exist;
- process::metrics::Counter adding_eth0_egress_filters_errors;
- process::metrics::Counter adding_eth0_egress_filters_already_exist;
- process::metrics::Counter adding_lo_ip_filters_errors;
- process::metrics::Counter adding_lo_ip_filters_already_exist;
- process::metrics::Counter adding_veth_ip_filters_errors;
- process::metrics::Counter adding_veth_ip_filters_already_exist;
- process::metrics::Counter adding_veth_icmp_filters_errors;
- process::metrics::Counter adding_veth_icmp_filters_already_exist;
- process::metrics::Counter adding_veth_arp_filters_errors;
- process::metrics::Counter adding_veth_arp_filters_already_exist;
- process::metrics::Counter adding_eth0_icmp_filters_errors;
- process::metrics::Counter adding_eth0_icmp_filters_already_exist;
- process::metrics::Counter adding_eth0_arp_filters_errors;
- process::metrics::Counter adding_eth0_arp_filters_already_exist;
- process::metrics::Counter removing_eth0_ip_filters_errors;
- process::metrics::Counter removing_eth0_ip_filters_do_not_exist;
- process::metrics::Counter removing_eth0_egress_filters_errors;
- process::metrics::Counter removing_eth0_egress_filters_do_not_exist;
- process::metrics::Counter removing_lo_ip_filters_errors;
- process::metrics::Counter removing_lo_ip_filters_do_not_exist;
- process::metrics::Counter removing_veth_ip_filters_errors;
- process::metrics::Counter removing_veth_ip_filters_do_not_exist;
- process::metrics::Counter removing_eth0_icmp_filters_errors;
- process::metrics::Counter removing_eth0_icmp_filters_do_not_exist;
- process::metrics::Counter removing_eth0_arp_filters_errors;
- process::metrics::Counter removing_eth0_arp_filters_do_not_exist;
- process::metrics::Counter updating_eth0_icmp_filters_errors;
- process::metrics::Counter updating_eth0_icmp_filters_already_exist;
- process::metrics::Counter updating_eth0_icmp_filters_do_not_exist;
- process::metrics::Counter updating_eth0_arp_filters_errors;
- process::metrics::Counter updating_eth0_arp_filters_already_exist;
- process::metrics::Counter updating_eth0_arp_filters_do_not_exist;
- process::metrics::Counter updating_container_ip_filters_errors;
- } metrics;
-
- PortMappingIsolatorProcess(
- const Flags& _flags,
- const std::string& _eth0,
- const std::string& _lo,
- const net::MAC& _hostMAC,
- const net::IPNetwork& _hostIPNetwork,
- const size_t _hostEth0MTU,
- const net::IP& _hostDefaultGateway,
- const hashmap<std::string, std::string>& _hostNetworkConfigurations,
- const Option<Bytes>& _egressRateLimitPerContainer,
- const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
- const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
- const std::set<uint16_t>& _flowIDs)
- : flags(_flags),
- eth0(_eth0),
- lo(_lo),
- hostMAC(_hostMAC),
- hostIPNetwork(_hostIPNetwork),
- hostEth0MTU(_hostEth0MTU),
- hostDefaultGateway(_hostDefaultGateway),
- hostNetworkConfigurations(_hostNetworkConfigurations),
- egressRateLimitPerContainer(_egressRateLimitPerContainer),
- managedNonEphemeralPorts(_managedNonEphemeralPorts),
- ephemeralPortsAllocator(_ephemeralPortsAllocator),
- freeFlowIds(_flowIDs) {}
-
- // Continuations.
- Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
- Try<Info*> _recover(pid_t pid);
-
- void _update(
- const ContainerID& containerId,
- const process::Future<Option<int>>& status);
-
- process::Future<ResourceStatistics> _usage(
- const ResourceStatistics& result,
- const process::Subprocess& s);
-
- process::Future<ResourceStatistics> __usage(
- ResourceStatistics result,
- const process::Future<std::string>& out);
-
- // Helper functions.
- Try<Nothing> addHostIPFilters(
- const routing::filter::ip::PortRange& range,
- const Option<uint16_t>& flowId,
- const std::string& veth);
-
- Try<Nothing> removeHostIPFilters(
- const routing::filter::ip::PortRange& range,
- const std::string& veth,
- bool removeFiltersOnVeth = true);
-
- // Return the scripts that will be executed in the child context.
- std::string scripts(Info* info);
-
- uint16_t getNextFlowId();
-
- const Flags flags;
-
- const std::string eth0;
- const std::string lo;
- const net::MAC hostMAC;
- const net::IPNetwork hostIPNetwork;
- const size_t hostEth0MTU;
- const net::IP hostDefaultGateway;
-
- // Describe the host network configurations. It is a map between
- // configure proc files (e.g., /proc/sys/net/core/somaxconn) and
- // values of the configure proc files.
- const hashmap<std::string, std::string> hostNetworkConfigurations;
-
- // The optional throughput limit to containers' egress traffic.
- const Option<Bytes> egressRateLimitPerContainer;
-
- // All the non-ephemeral ports managed by the slave, as passed in
- // via flags.resources.
- const IntervalSet<uint16_t> managedNonEphemeralPorts;
-
- process::Owned<EphemeralPortsAllocator> ephemeralPortsAllocator;
-
- // Store a set of unused flow ID's on this slave.
- std::set<uint16_t> freeFlowIds;
-
- hashmap<ContainerID, Info*> infos;
-
- // Recovered containers from a previous run that weren't managed by
- // the network isolator.
- hashset<ContainerID> unmanaged;
-};
-
-
-// Defines the subcommand for 'update' that needs to be executed by a
-// subprocess to update the filters inside a container.
-class PortMappingUpdate : public Subcommand
-{
-public:
- static const char* NAME;
-
- struct Flags : public flags::FlagsBase
- {
- Flags();
-
- Option<std::string> eth0_name;
- Option<std::string> lo_name;
- Option<pid_t> pid;
- Option<JSON::Object> ports_to_add;
- Option<JSON::Object> ports_to_remove;
- };
-
- PortMappingUpdate() : Subcommand(NAME) {}
-
- Flags flags;
-
-protected:
- virtual int execute();
- virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-
-// Defines the subcommand for 'statistics' that needs to be executed
-// by a subprocess to retrieve newtork statistics from inside a
-// container.
-class PortMappingStatistics : public Subcommand
-{
-public:
- static const char* NAME;
-
- struct Flags : public flags::FlagsBase
- {
- Flags();
-
- Option<std::string> eth0_name;
- Option<pid_t> pid;
- bool enable_socket_statistics_summary;
- bool enable_socket_statistics_details;
- };
-
- PortMappingStatistics() : Subcommand(NAME) {}
-
- Flags flags;
-
-protected:
- virtual int execute();
- virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __PORT_MAPPING_ISOLATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix.hpp b/src/slave/containerizer/isolators/posix.hpp
deleted file mode 100644
index 00df902..0000000
--- a/src/slave/containerizer/isolators/posix.hpp
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __POSIX_ISOLATOR_HPP__
-#define __POSIX_ISOLATOR_HPP__
-
-#include <process/future.hpp>
-
-#include <stout/hashmap.hpp>
-#include <stout/os.hpp>
-
-#include <stout/os/pstree.hpp>
-
-#include "slave/flags.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-#include "usage/usage.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// A basic MesosIsolatorProcess that keeps track of the pid but
-// doesn't do any resource isolation. Subclasses must implement
-// usage() for their appropriate resource(s).
-class PosixIsolatorProcess : public MesosIsolatorProcess
-{
-public:
- virtual process::Future<Nothing> recover(
- const std::list<mesos::slave::ContainerState>& state,
- const hashset<ContainerID>& orphans)
- {
- foreach (const mesos::slave::ContainerState& run, state) {
- // This should (almost) never occur: see comment in
- // PosixLauncher::recover().
- if (pids.contains(run.container_id())) {
- return process::Failure("Container already recovered");
- }
-
- pids.put(run.container_id(), run.pid());
-
- process::Owned<process::Promise<mesos::slave::ContainerLimitation>>
- promise(new process::Promise<mesos::slave::ContainerLimitation>());
- promises.put(run.container_id(), promise);
- }
-
- return Nothing();
- }
-
- virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user)
- {
- if (promises.contains(containerId)) {
- return process::Failure("Container " + stringify(containerId) +
- " has already been prepared");
- }
-
- process::Owned<process::Promise<mesos::slave::ContainerLimitation>> promise(
- new process::Promise<mesos::slave::ContainerLimitation>());
- promises.put(containerId, promise);
-
- return None();
- }
-
- virtual process::Future<Nothing> isolate(
- const ContainerID& containerId,
- pid_t pid)
- {
- if (!promises.contains(containerId)) {
- return process::Failure("Unknown container: " + stringify(containerId));
- }
-
- pids.put(containerId, pid);
-
- return Nothing();
- }
-
- virtual process::Future<mesos::slave::ContainerLimitation> watch(
- const ContainerID& containerId)
- {
- if (!promises.contains(containerId)) {
- return process::Failure("Unknown container: " + stringify(containerId));
- }
-
- return promises[containerId]->future();
- }
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources)
- {
- if (!promises.contains(containerId)) {
- return process::Failure("Unknown container: " + stringify(containerId));
- }
-
- // No resources are actually isolated so nothing to do.
- return Nothing();
- }
-
- virtual process::Future<Nothing> cleanup(const ContainerID& containerId)
- {
- if (!promises.contains(containerId)) {
- return process::Failure("Unknown container: " + stringify(containerId));
- }
-
- // TODO(idownes): We should discard the container's promise here to signal
- // to anyone that holds the future from watch().
- promises.erase(containerId);
-
- pids.erase(containerId);
-
- return Nothing();
- }
-
-protected:
- hashmap<ContainerID, pid_t> pids;
- hashmap<ContainerID,
- process::Owned<process::Promise<mesos::slave::ContainerLimitation>>>
- promises;
-};
-
-
-class PosixCpuIsolatorProcess : public PosixIsolatorProcess
-{
-public:
- static Try<mesos::slave::Isolator*> create(const Flags& flags)
- {
- process::Owned<MesosIsolatorProcess> process(
- new PosixCpuIsolatorProcess());
-
- return new MesosIsolator(process);
- }
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId)
- {
- if (!pids.contains(containerId)) {
- LOG(WARNING) << "No resource usage for unknown container '"
- << containerId << "'";
- return ResourceStatistics();
- }
-
- // Use 'mesos-usage' but only request 'cpus_' values.
- Try<ResourceStatistics> usage =
- mesos::internal::usage(pids.get(containerId).get(), false, true);
- if (usage.isError()) {
- return process::Failure(usage.error());
- }
- return usage.get();
- }
-
-private:
- PosixCpuIsolatorProcess() {}
-};
-
-
-class PosixMemIsolatorProcess : public PosixIsolatorProcess
-{
-public:
- static Try<mesos::slave::Isolator*> create(const Flags& flags)
- {
- process::Owned<MesosIsolatorProcess> process(
- new PosixMemIsolatorProcess());
-
- return new MesosIsolator(process);
- }
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId)
- {
- if (!pids.contains(containerId)) {
- LOG(WARNING) << "No resource usage for unknown container '"
- << containerId << "'";
- return ResourceStatistics();
- }
-
- // Use 'mesos-usage' but only request 'mem_' values.
- Try<ResourceStatistics> usage =
- mesos::internal::usage(pids.get(containerId).get(), true, false);
- if (usage.isError()) {
- return process::Failure(usage.error());
- }
- return usage.get();
- }
-
-private:
- PosixMemIsolatorProcess() {}
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __POSIX_ISOLATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix/disk.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix/disk.cpp b/src/slave/containerizer/isolators/posix/disk.cpp
deleted file mode 100644
index 73e62a2..0000000
--- a/src/slave/containerizer/isolators/posix/disk.cpp
+++ /dev/null
@@ -1,525 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <signal.h>
-
-#ifdef __linux__
-#include <sys/prctl.h>
-#endif
-#include <sys/types.h>
-
-#include <deque>
-#include <tuple>
-
-#include <glog/logging.h>
-
-#include <process/check.hpp>
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/delay.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/numify.hpp>
-#include <stout/strings.hpp>
-
-#include <stout/os/exists.hpp>
-#include <stout/os/killtree.hpp>
-
-#include "common/protobuf_utils.hpp"
-
-#include "slave/containerizer/isolators/posix/disk.hpp"
-
-using namespace process;
-
-using std::deque;
-using std::list;
-using std::string;
-using std::vector;
-
-using mesos::slave::ContainerLimitation;
-using mesos::slave::ContainerPrepareInfo;
-using mesos::slave::ContainerState;
-using mesos::slave::Isolator;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-Try<Isolator*> PosixDiskIsolatorProcess::create(const Flags& flags)
-{
- // TODO(jieyu): Check the availability of command 'du'.
-
- return new MesosIsolator(process::Owned<MesosIsolatorProcess>(
- new PosixDiskIsolatorProcess(flags)));
-}
-
-
-PosixDiskIsolatorProcess::Info::PathInfo::~PathInfo()
-{
- usage.discard();
-}
-
-
-PosixDiskIsolatorProcess::PosixDiskIsolatorProcess(const Flags& _flags)
- : flags(_flags), collector(flags.container_disk_watch_interval) {}
-
-
-PosixDiskIsolatorProcess::~PosixDiskIsolatorProcess() {}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::recover(
- const list<ContainerState>& states,
- const hashset<ContainerID>& orphans)
-{
- foreach (const ContainerState& state, states) {
- // Since we checkpoint the executor after we create its working
- // directory, the working directory should definitely exist.
- CHECK(os::exists(state.directory()))
- << "Executor work directory " << state.directory() << " doesn't exist";
-
- infos.put(state.container_id(), Owned<Info>(new Info(state.directory())));
- }
-
- return Nothing();
-}
-
-
-Future<Option<ContainerPrepareInfo>> PosixDiskIsolatorProcess::prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user)
-{
- if (infos.contains(containerId)) {
- return Failure("Container has already been prepared");
- }
-
- infos.put(containerId, Owned<Info>(new Info(directory)));
-
- return None();
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::isolate(
- const ContainerID& containerId,
- pid_t pid)
-{
- if (!infos.contains(containerId)) {
- return Failure("Unknown container");
- }
-
- return Nothing();
-}
-
-
-Future<ContainerLimitation> PosixDiskIsolatorProcess::watch(
- const ContainerID& containerId)
-{
- if (!infos.contains(containerId)) {
- return Failure("Unknown container");
- }
-
- return infos[containerId]->limitation.future();
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::update(
- const ContainerID& containerId,
- const Resources& resources)
-{
- if (!infos.contains(containerId)) {
- LOG(WARNING) << "Ignoring update for unknown container " << containerId;
- return Nothing();
- }
-
- LOG(INFO) << "Updating the disk resources for container "
- << containerId << " to " << resources;
-
- const Owned<Info>& info = infos[containerId];
-
- // This stores the updated quotas.
- hashmap<string, Resources> quotas;
-
- foreach (const Resource& resource, resources) {
- if (resource.name() != "disk") {
- continue;
- }
-
- // The path at which we will collect disk usage and enforce quota.
- string path;
-
- // NOTE: We do not allow the case where has_disk() is true but
- // with nothing set inside DiskInfo. The master will enforce it.
- if (!resource.has_disk()) {
- // Regular disk used for executor working directory.
- path = info->directory;
- } else {
- // TODO(jieyu): Support persistent volmes as well.
- LOG(ERROR) << "Enforcing disk quota unsupported for " << resource;
- continue;
- }
-
- quotas[path] += resource;
- }
-
- // Update the quota for paths. For each new path, we also initiate
- // the disk usage collection.
- foreachpair (const string& path, const Resources& quota, quotas) {
- if (!info->paths.contains(path)) {
- info->paths[path].usage = collector.usage(path)
- .onAny(defer(
- PID<PosixDiskIsolatorProcess>(this),
- &PosixDiskIsolatorProcess::_collect,
- containerId,
- path,
- lambda::_1));
- }
-
- info->paths[path].quota = quota;
- }
-
- // Remove paths that we no longer interested in.
- foreach (const string& path, info->paths.keys()) {
- if (!quotas.contains(path)) {
- info->paths.erase(path);
- }
- }
-
- return Nothing();
-}
-
-
-void PosixDiskIsolatorProcess::_collect(
- const ContainerID& containerId,
- const string& path,
- const Future<Bytes>& future)
-{
- if (future.isDiscarded()) {
- LOG(INFO) << "Checking disk usage at '" << path << "' for container "
- << containerId << " has been cancelled";
- } else if (future.isFailed()) {
- LOG(ERROR) << "Checking disk usage at '" << path << "' for container "
- << containerId << " has failed: " << future.failure();
- }
-
- if (!infos.contains(containerId)) {
- // The container might have just been destroyed.
- return;
- }
-
- const Owned<Info>& info = infos[containerId];
-
- if (!info->paths.contains(path)) {
- // The path might have just been removed from this container's
- // resources.
- return;
- }
-
- // Check if the disk usage exceeds the quota. If yes, report the
- // limitation. We keep collecting the disk usage for 'path' by
- // initiating another round of disk usage check. The check will be
- // throttled by DiskUsageCollector.
- if (future.isReady()) {
- // Save the last disk usage.
- info->paths[path].lastUsage = future.get();
-
- if (flags.enforce_container_disk_quota) {
- Option<Bytes> quota = info->paths[path].quota.disk();
- CHECK_SOME(quota);
-
- if (future.get() > quota.get()) {
- info->limitation.set(
- protobuf::slave::createContainerLimitation(
- Resources(info->paths[path].quota),
- "Disk usage (" + stringify(future.get()) +
- ") exceeds quota (" + stringify(quota.get()) + ")",
- TaskStatus::REASON_CONTAINER_LIMITATION_DISK));
- }
- }
- }
-
- info->paths[path].usage = collector.usage(path)
- .onAny(defer(
- PID<PosixDiskIsolatorProcess>(this),
- &PosixDiskIsolatorProcess::_collect,
- containerId,
- path,
- lambda::_1));
-}
-
-
-Future<ResourceStatistics> PosixDiskIsolatorProcess::usage(
- const ContainerID& containerId)
-{
- if (!infos.contains(containerId)) {
- return Failure("Unknown container");
- }
-
- ResourceStatistics result;
-
- const Owned<Info>& info = infos[containerId];
-
- if (info->paths.contains(info->directory)) {
- Option<Bytes> quota = info->paths[info->directory].quota.disk();
- CHECK_SOME(quota);
-
- result.set_disk_limit_bytes(quota.get().bytes());
-
- // NOTE: There may be a large delay (# of containers * interval)
- // until an initial cached value is returned here!
- if (info->paths[info->directory].lastUsage.isSome()) {
- result.set_disk_used_bytes(
- info->paths[info->directory].lastUsage.get().bytes());
- }
- }
-
- return result;
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::cleanup(
- const ContainerID& containerId)
-{
- if (!infos.contains(containerId)) {
- LOG(WARNING) << "Ignoring cleanup for unknown container " << containerId;
- return Nothing();
- }
-
- infos.erase(containerId);
-
- return Nothing();
-}
-
-
-class DiskUsageCollectorProcess : public Process<DiskUsageCollectorProcess>
-{
-public:
- DiskUsageCollectorProcess(const Duration& _interval) : interval(_interval) {}
- virtual ~DiskUsageCollectorProcess() {}
-
- Future<Bytes> usage(const string& path)
- {
- foreach (const Owned<Entry>& entry, entries) {
- if (entry->path == path) {
- return entry->promise.future();
- }
- }
-
- entries.push_back(Owned<Entry>(new Entry(path)));
-
- // Install onDiscard callback.
- Future<Bytes> future = entries.back()->promise.future();
- future.onDiscard(defer(self(), &Self::discard, path));
-
- return future;
- }
-
-protected:
- void initialize()
- {
- schedule();
- }
-
- void finalize()
- {
- foreach (const Owned<Entry>& entry, entries) {
- if (entry->du.isSome() && entry->du.get().status().isPending()) {
- os::killtree(entry->du.get().pid(), SIGKILL);
- }
-
- entry->promise.fail("DiskUsageCollector is destroyed");
- }
- }
-
-private:
- // Describe a single pending check.
- struct Entry
- {
- explicit Entry(const string& _path) : path(_path) {}
-
- string path;
- Option<Subprocess> du;
- Promise<Bytes> promise;
- };
-
- // This function is invoked right before each 'du' is exec'ed. Note
- // that this function needs to be async signal safe.
- static int setupChild()
- {
-#ifdef __linux__
- // Kill the child process if the parent exits.
- // NOTE: This function should never returns non-zero because we
- // are passing in a valid signal.
- return ::prctl(PR_SET_PDEATHSIG, SIGKILL);
-#else
- return 0;
-#endif
- }
-
- void discard(const string& path)
- {
- for (auto it = entries.begin(); it != entries.end(); ++it) {
- // We only cancel those checks whose 'du' haven't been launched.
- if ((*it)->path == path && (*it)->du.isNone()) {
- (*it)->promise.discard();
- entries.erase(it);
- break;
- }
- }
- }
-
- // Schedule a 'du' to be invoked. The current implementation does
- // not allow multiple 'du's running concurrently. The minimal
- // interval between two subsequent 'du's is controlled by 'interval'
- // for throttling purpose.
- void schedule()
- {
- if (entries.empty()) {
- delay(interval, self(), &Self::schedule);
- return;
- }
-
- const Owned<Entry>& entry = entries.front();
-
- // Invoke 'du' and report number of 1K-byte blocks. We fix the
- // block size here so that we can get consistent results on all
- // platforms (e.g., OS X uses 512 byte blocks).
- //
- // NOTE: The 'du' processes are run in the slave's cgroup and it
- // will be that cgroup that is charged for (a) memory to cache the
- // fs data structures, (b) disk I/O to read those structures, and
- // (c) the cpu time to traverse.
- Try<Subprocess> s = subprocess(
- "du",
- vector<string>({"du", "-k", "-s", entry->path}),
- Subprocess::PATH("/dev/null"),
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- None(),
- None(),
- setupChild);
-
- if (s.isError()) {
- entry->promise.fail("Failed to exec 'du': " + s.error());
-
- entries.pop_front();
- delay(interval, self(), &Self::schedule);
- return;
- }
-
- entry->du = s.get();
-
- await(s.get().status(),
- io::read(s.get().out().get()),
- io::read(s.get().err().get()))
- .onAny(defer(self(), &Self::_schedule, lambda::_1));
- }
-
- void _schedule(const Future<std::tuple<
- Future<Option<int>>,
- Future<string>,
- Future<string>>>& future)
- {
- CHECK_READY(future);
- CHECK(!entries.empty());
-
- const Owned<Entry>& entry = entries.front();
- CHECK_SOME(entry->du);
-
- Future<Option<int>> status = std::get<0>(future.get());
-
- if (!status.isReady()) {
- entry->promise.fail(
- "Failed to perform 'du': " +
- (status.isFailed() ? status.failure() : "discarded"));
- } else if (status.get().isNone()) {
- entry->promise.fail("Failed to reap the status of 'du'");
- } else if (status.get().get() != 0) {
- Future<string> error = std::get<2>(future.get());
- if (!error.isReady()) {
- entry->promise.fail(
- "Failed to perform 'du'. Reading stderr failed: " +
- (error.isFailed() ? error.failure() : "discarded"));
- } else {
- entry->promise.fail("Failed to perform 'du': " + error.get());
- }
- } else {
- Future<string> output = std::get<1>(future.get());
- if (!output.isReady()) {
- entry->promise.fail(
- "Failed to read stdout from 'du': " +
- (output.isFailed() ? output.failure() : "discarded"));
- } else {
- // Parsing the output from 'du'. The following is a sample
- // output. Tab is used as the delimiter between the number of
- // blocks and the checked path.
- // $ du /var/lib/mesos/.../runs/container_id
- // 1024 /var/lib/mesos/.../runs/container_id
- vector<string> tokens = strings::tokenize(output.get(), " \t");
- if (tokens.empty()) {
- entry->promise.fail("Unexpected output from 'du': " + output.get());
- } else {
- Try<size_t> value = numify<size_t>(tokens[0]);
- if (value.isError()) {
- entry->promise.fail("Unexpected output from 'du': " + output.get());
- } else {
- // Notify the callers.
- entry->promise.set(Kilobytes(value.get()));
- }
- }
- }
- }
-
- entries.pop_front();
- delay(interval, self(), &Self::schedule);
- }
-
- const Duration interval;
-
- // A queue of pending checks.
- deque<Owned<Entry>> entries;
-};
-
-
-DiskUsageCollector::DiskUsageCollector(const Duration& interval)
-{
- process = new DiskUsageCollectorProcess(interval);
- spawn(process);
-}
-
-
-DiskUsageCollector::~DiskUsageCollector()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Bytes> DiskUsageCollector::usage(const string& path)
-{
- return dispatch(process, &DiskUsageCollectorProcess::usage, path);
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix/disk.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix/disk.hpp b/src/slave/containerizer/isolators/posix/disk.hpp
deleted file mode 100644
index 85df5d2..0000000
--- a/src/slave/containerizer/isolators/posix/disk.hpp
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __POSIX_DISK_ISOLATOR_HPP__
-#define __POSIX_DISK_ISOLATOR_HPP__
-
-#include <string>
-
-#include <process/owned.hpp>
-
-#include <stout/bytes.hpp>
-#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
-
-#include "slave/flags.hpp"
-#include "slave/state.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declarations.
-class DiskUsageCollectorProcess;
-
-
-// Responsible for collecting disk usage for paths, while ensuring
-// that an interval elapses between each collection.
-class DiskUsageCollector
-{
-public:
- DiskUsageCollector(const Duration& interval);
- ~DiskUsageCollector();
-
- // Returns the disk usage rooted at 'path'. The user can discard the
- // returned future to cancel the check.
- process::Future<Bytes> usage(const std::string& path);
-
-private:
- DiskUsageCollectorProcess* process;
-};
-
-
-// This isolator monitors the disk usage for containers, and reports
-// ContainerLimitation when a container exceeds its disk quota. This
-// leverages the DiskUsageCollector to ensure that we don't induce too
-// much CPU usage and disk caching effects from running 'du' too
-// often.
-//
-// NOTE: Currently all containers are processed in the same queue,
-// which means that when a container starts, it could take many disk
-// collection intervals until any data is available in the resource
-// usage statistics!
-//
-// TODO(jieyu): Consider handling each container independently, or
-// triggering an initial collection when the container starts, to
-// ensure that we have usage statistics without a large delay.
-class PosixDiskIsolatorProcess : public MesosIsolatorProcess
-{
-public:
- static Try<mesos::slave::Isolator*> create(const Flags& flags);
-
- virtual ~PosixDiskIsolatorProcess();
-
- virtual process::Future<Nothing> recover(
- const std::list<mesos::slave::ContainerState>& states,
- const hashset<ContainerID>& orphans);
-
- virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- virtual process::Future<Nothing> isolate(
- const ContainerID& containerId,
- pid_t pid);
-
- virtual process::Future<mesos::slave::ContainerLimitation> watch(
- const ContainerID& containerId);
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- virtual process::Future<Nothing> cleanup(
- const ContainerID& containerId);
-
-private:
- PosixDiskIsolatorProcess(const Flags& flags);
-
- void _collect(
- const ContainerID& containerId,
- const std::string& path,
- const process::Future<Bytes>& future);
-
- const Flags flags;
- DiskUsageCollector collector;
-
- struct Info
- {
- explicit Info(const std::string& _directory) : directory(_directory) {}
-
- // We save executor working directory here so that we know where
- // to collect disk usage for disk resources without DiskInfo.
- const std::string directory;
-
- process::Promise<mesos::slave::ContainerLimitation> limitation;
-
- // The keys of the hashmaps contain the executor working directory
- // above, and optionally paths of volumes used by the container.
- // For each path, we maintain its quota and its last usage.
- struct PathInfo
- {
- ~PathInfo();
-
- Resources quota;
- process::Future<Bytes> usage;
- Option<Bytes> lastUsage;
- };
-
- hashmap<std::string, PathInfo> paths;
- };
-
- hashmap<ContainerID, process::Owned<Info>> infos;
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __POSIX_DISK_ISOLATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index c03b89e..654f0e1 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -40,7 +40,7 @@
#include "slave/containerizer/linux_launcher.hpp"
-#include "slave/containerizer/isolators/namespaces/pid.hpp"
+#include "slave/containerizer/mesos/isolators/namespaces/pid.hpp"
using namespace process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d1fc5a4..0664d00 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -47,36 +47,35 @@
#include "slave/containerizer/linux_launcher.hpp"
#endif
-#include "slave/containerizer/isolators/posix.hpp"
+#include "slave/containerizer/mesos/isolators/posix.hpp"
-#include "slave/containerizer/isolators/posix/disk.hpp"
+#include "slave/containerizer/mesos/isolators/posix/disk.hpp"
#ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/perf_event.hpp"
#endif
#ifdef __linux__
-#include "slave/containerizer/isolators/filesystem/linux.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/linux.hpp"
#endif
-#include "slave/containerizer/isolators/filesystem/posix.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/posix.hpp"
#ifdef __linux__
-#include "slave/containerizer/isolators/filesystem/shared.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/shared.hpp"
#endif
#ifdef __linux__
-#include "slave/containerizer/isolators/namespaces/pid.hpp"
+#include "slave/containerizer/mesos/isolators/namespaces/pid.hpp"
#endif
#ifdef WITH_NETWORK_ISOLATOR
-#include "slave/containerizer/isolators/network/port_mapping.hpp"
+#include "slave/containerizer/mesos/isolators/network/port_mapping.hpp"
#endif
#include "slave/containerizer/mesos/containerizer.hpp"
#include "slave/containerizer/mesos/launch.hpp"
-
-#include "slave/containerizer/provisioner/provisioner.hpp"
+#include "slave/containerizer/mesos/provisioner/provisioner.hpp"
using std::list;
using std::map;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
new file mode 100644
index 0000000..db9dde0
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CGROUPS_ISOLATOR_CONSTANTS_HPP__
+#define __CGROUPS_ISOLATOR_CONSTANTS_HPP__
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// CPU subsystem constants.
+const uint64_t CPU_SHARES_PER_CPU = 1024;
+const uint64_t CPU_SHARES_PER_CPU_REVOCABLE = 10;
+const uint64_t MIN_CPU_SHARES = 2; // Linux constant.
+const Duration CPU_CFS_PERIOD = Milliseconds(100); // Linux default.
+const Duration MIN_CPU_CFS_QUOTA = Milliseconds(1);
+
+
+// Memory subsystem constants.
+const Bytes MIN_MEMORY = Megabytes(32);
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CGROUPS_ISOLATOR_CONSTANTS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
new file mode 100644
index 0000000..84a64e5
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+
+#include <mesos/type_utils.hpp>
+#include <mesos/values.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/pid.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/check.hpp>
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
+
+using namespace process;
+
+using std::list;
+using std::set;
+using std::string;
+using std::vector;
+
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerPrepareInfo;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+CgroupsCpushareIsolatorProcess::CgroupsCpushareIsolatorProcess(
+ const Flags& _flags,
+ const hashmap<string, string>& _hierarchies,
+ const vector<string>& _subsystems)
+ : flags(_flags),
+ hierarchies(_hierarchies),
+ subsystems(_subsystems) {}
+
+
+CgroupsCpushareIsolatorProcess::~CgroupsCpushareIsolatorProcess() {}
+
+
+Try<Isolator*> CgroupsCpushareIsolatorProcess::create(const Flags& flags)
+{
+ Try<string> hierarchyCpu = cgroups::prepare(
+ flags.cgroups_hierarchy,
+ "cpu",
+ flags.cgroups_root);
+
+ if (hierarchyCpu.isError()) {
+ return Error(
+ "Failed to prepare hierarchy for cpu subsystem: " +
+ hierarchyCpu.error());
+ }
+
+ Try<string> hierarchyCpuacct = cgroups::prepare(
+ flags.cgroups_hierarchy,
+ "cpuacct",
+ flags.cgroups_root);
+
+ if (hierarchyCpuacct.isError()) {
+ return Error(
+ "Failed to prepare hierarchy for cpuacct subsystem: " +
+ hierarchyCpuacct.error());
+ }
+
+ hashmap<string, string> hierarchies;
+ vector<string> subsystems;
+
+ hierarchies["cpu"] = hierarchyCpu.get();
+ hierarchies["cpuacct"] = hierarchyCpuacct.get();
+
+ if (hierarchyCpu.get() == hierarchyCpuacct.get()) {
+ // Subsystem cpu and cpuacct are co-mounted (e.g., systemd).
+ hierarchies["cpu,cpuacct"] = hierarchyCpu.get();
+ subsystems.push_back("cpu,cpuacct");
+
+ // Ensure that no other subsystem is attached to the hierarchy.
+ Try<set<string>> _subsystems = cgroups::subsystems(hierarchyCpu.get());
+ if (_subsystems.isError()) {
+ return Error(
+ "Failed to get the list of attached subsystems for hierarchy " +
+ hierarchyCpu.get());
+ } else if (_subsystems.get().size() != 2) {
+ return Error(
+ "Unexpected subsystems found attached to the hierarchy " +
+ hierarchyCpu.get());
+ }
+ } else {
+ // Subsystem cpu and cpuacct are mounted separately.
+ subsystems.push_back("cpu");
+ subsystems.push_back("cpuacct");
+
+ // Ensure that no other subsystem is attached to each of the
+ // hierarchy.
+ Try<set<string>> _subsystems = cgroups::subsystems(hierarchyCpu.get());
+ if (_subsystems.isError()) {
+ return Error(
+ "Failed to get the list of attached subsystems for hierarchy " +
+ hierarchyCpu.get());
+ } else if (_subsystems.get().size() != 1) {
+ return Error(
+ "Unexpected subsystems found attached to the hierarchy " +
+ hierarchyCpu.get());
+ }
+
+ _subsystems = cgroups::subsystems(hierarchyCpuacct.get());
+ if (_subsystems.isError()) {
+ return Error(
+ "Failed to get the list of attached subsystems for hierarchy " +
+ hierarchyCpuacct.get());
+ } else if (_subsystems.get().size() != 1) {
+ return Error(
+ "Unexpected subsystems found attached to the hierarchy " +
+ hierarchyCpuacct.get());
+ }
+ }
+
+ if (flags.cgroups_enable_cfs) {
+ Try<bool> exists = cgroups::exists(
+ hierarchies["cpu"],
+ flags.cgroups_root,
+ "cpu.cfs_quota_us");
+
+ if (exists.isError() || !exists.get()) {
+ return Error(
+ "Failed to find 'cpu.cfs_quota_us'. Your kernel "
+ "might be too old to use the CFS cgroups feature.");
+ }
+ }
+
+ process::Owned<MesosIsolatorProcess> process(
+ new CgroupsCpushareIsolatorProcess(flags, hierarchies, subsystems));
+
+ return new MesosIsolator(process);
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::recover(
+ const list<ContainerState>& states,
+ const hashset<ContainerID>& orphans)
+{
+ foreach (const ContainerState& state, states) {
+ const ContainerID& containerId = state.container_id();
+ const string cgroup = path::join(flags.cgroups_root, containerId.value());
+
+ Try<bool> exists = cgroups::exists(hierarchies["cpu"], cgroup);
+ if (exists.isError()) {
+ foreachvalue (Info* info, infos) {
+ delete info;
+ }
+ infos.clear();
+ return Failure(
+ "Failed to check cgroup for container " + stringify(containerId));
+ }
+
+ if (!exists.get()) {
+ // This may occur if the executor has exited and the isolator
+ // has destroyed the cgroup but the slave dies before noticing
+ // this. This will be detected when the containerizer tries to
+ // monitor the executor's pid.
+ LOG(WARNING) << "Couldn't find cgroup for container " << containerId;
+ continue;
+ }
+
+ infos[containerId] = new Info(containerId, cgroup);
+ }
+
+ // Remove orphan cgroups.
+ foreach (const string& subsystem, subsystems) {
+ Try<vector<string>> cgroups = cgroups::get(
+ hierarchies[subsystem],
+ flags.cgroups_root);
+
+ if (cgroups.isError()) {
+ foreachvalue (Info* info, infos) {
+ delete info;
+ }
+ infos.clear();
+ return Failure(cgroups.error());
+ }
+
+ foreach (const string& cgroup, cgroups.get()) {
+ // Ignore the slave cgroup (see the --slave_subsystems flag).
+ // TODO(idownes): Remove this when the cgroups layout is
+ // updated, see MESOS-1185.
+ if (cgroup == path::join(flags.cgroups_root, "slave")) {
+ continue;
+ }
+
+ ContainerID containerId;
+ containerId.set_value(Path(cgroup).basename());
+
+ if (infos.contains(containerId)) {
+ continue;
+ }
+
+ // Known orphan cgroups will be destroyed by the containerizer
+ // using the normal cleanup path. See MESOS-2367 for details.
+ if (orphans.contains(containerId)) {
+ infos[containerId] = new Info(containerId, cgroup);
+ continue;
+ }
+
+ LOG(INFO) << "Removing unknown orphaned cgroup '"
+ << path::join(subsystem, cgroup) << "'";
+
+ // We don't wait on the destroy as we don't want to block recovery.
+ cgroups::destroy(
+ hierarchies[subsystem],
+ cgroup,
+ cgroups::DESTROY_TIMEOUT);
+ }
+ }
+
+ return Nothing();
+}
+
+
+Future<Option<ContainerPrepareInfo>> CgroupsCpushareIsolatorProcess::prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ if (infos.contains(containerId)) {
+ return Failure("Container has already been prepared");
+ }
+
+ // TODO(bmahler): Don't insert into 'infos' unless we create the
+ // cgroup successfully. It's safe for now because 'cleanup' gets
+ // called if we return a Failure, but cleanup will fail because the
+ // cgroup does not exist when cgroups::destroy is called.
+ Info* info = new Info(
+ containerId, path::join(flags.cgroups_root, containerId.value()));
+
+ infos[containerId] = info;
+
+ foreach (const string& subsystem, subsystems) {
+ Try<bool> exists = cgroups::exists(hierarchies[subsystem], info->cgroup);
+ if (exists.isError()) {
+ return Failure("Failed to prepare isolator: " + exists.error());
+ } else if (exists.get()) {
+ return Failure("Failed to prepare isolator: cgroup already exists");
+ }
+
+ Try<Nothing> create = cgroups::create(hierarchies[subsystem], info->cgroup);
+ if (create.isError()) {
+ return Failure("Failed to prepare isolator: " + create.error());
+ }
+
+ // Chown the cgroup so the executor can create nested cgroups. Do
+ // not recurse so the control files are still owned by the slave
+ // user and thus cannot be changed by the executor.
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ user.get(),
+ path::join(hierarchies[subsystem], info->cgroup),
+ false);
+ if (chown.isError()) {
+ return Failure("Failed to prepare isolator: " + chown.error());
+ }
+ }
+ }
+
+ return update(containerId, executorInfo.resources())
+ .then([]() -> Future<Option<ContainerPrepareInfo>> {
+ return None();
+ });
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::isolate(
+ const ContainerID& containerId,
+ pid_t pid)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+
+ CHECK_NONE(info->pid);
+ info->pid = pid;
+
+ foreach (const string& subsystem, subsystems) {
+ Try<Nothing> assign = cgroups::assign(
+ hierarchies[subsystem],
+ info->cgroup,
+ pid);
+
+ if (assign.isError()) {
+ LOG(ERROR) << "Failed to assign container '" << info->containerId
+ << " to its own cgroup '"
+ << path::join(hierarchies[subsystem], info->cgroup)
+ << "' : " << assign.error();
+
+ return Failure("Failed to isolate container: " + assign.error());
+ }
+ }
+
+ return Nothing();
+}
+
+
+Future<ContainerLimitation> CgroupsCpushareIsolatorProcess::watch(
+ const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ CHECK_NOTNULL(infos[containerId]);
+
+ return infos[containerId]->limitation.future();
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ if (resources.cpus().isNone()) {
+ return Failure("No cpus resource given");
+ }
+
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ const Option<string>& hierarchy = hierarchies.get("cpu");
+ if (hierarchy.isNone()) {
+ return Failure("No 'cpu' hierarchy");
+ }
+
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+ info->resources = resources;
+
+ double cpus = resources.cpus().get();
+
+ // Always set cpu.shares.
+ uint64_t shares;
+
+ if (flags.revocable_cpu_low_priority &&
+ resources.revocable().cpus().isSome()) {
+ shares = std::max(
+ (uint64_t) (CPU_SHARES_PER_CPU_REVOCABLE * cpus),
+ MIN_CPU_SHARES);
+ } else {
+ shares = std::max(
+ (uint64_t) (CPU_SHARES_PER_CPU * cpus),
+ MIN_CPU_SHARES);
+ }
+
+ Try<Nothing> write = cgroups::cpu::shares(
+ hierarchy.get(),
+ info->cgroup,
+ shares);
+
+ if (write.isError()) {
+ return Failure("Failed to update 'cpu.shares': " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'cpu.shares' to " << shares
+ << " (cpus " << cpus << ")"
+ << " for container " << containerId;
+
+ // Set cfs quota if enabled.
+ if (flags.cgroups_enable_cfs) {
+ write = cgroups::cpu::cfs_period_us(
+ hierarchy.get(),
+ info->cgroup,
+ CPU_CFS_PERIOD);
+
+ if (write.isError()) {
+ return Failure("Failed to update 'cpu.cfs_period_us': " + write.error());
+ }
+
+ Duration quota = std::max(CPU_CFS_PERIOD * cpus, MIN_CPU_CFS_QUOTA);
+
+ write = cgroups::cpu::cfs_quota_us(hierarchy.get(), info->cgroup, quota);
+ if (write.isError()) {
+ return Failure("Failed to update 'cpu.cfs_quota_us': " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD
+ << " and 'cpu.cfs_quota_us' to " << quota
+ << " (cpus " << cpus << ")"
+ << " for container " << containerId;
+ }
+
+ return Nothing();
+}
+
+
+Future<ResourceStatistics> CgroupsCpushareIsolatorProcess::usage(
+ const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+
+ ResourceStatistics result;
+
+ // TODO(chzhcn): Getting the number of processes and threads is
+ // available as long as any cgroup subsystem is used so this best
+ // not be tied to a specific cgroup isolator. A better place is
+ // probably Linux Launcher, which uses the cgroup freezer subsystem.
+ // That requires some change for it to adopt the new semantics of
+ // reporting subsystem-independent cgroup usage.
+ // Note: The complexity of this operation is linear to the number of
+ // processes and threads in a container: the kernel has to allocate
+ // memory to contain the list of pids or tids; the userspace has to
+ // parse the cgroup files to get the size. If this proves to be a
+ // performance bottleneck, some kind of rate limiting mechanism
+ // needs to be employed.
+ if (flags.cgroups_cpu_enable_pids_and_tids_count) {
+ Try<std::set<pid_t>> pids =
+ cgroups::processes(hierarchies["cpuacct"], info->cgroup);
+ if (pids.isError()) {
+ return Failure("Failed to get number of processes: " + pids.error());
+ }
+
+ result.set_processes(pids.get().size());
+
+ Try<std::set<pid_t>> tids =
+ cgroups::threads(hierarchies["cpuacct"], info->cgroup);
+ if (tids.isError()) {
+ return Failure("Failed to get number of threads: " + tids.error());
+ }
+
+ result.set_threads(tids.get().size());
+ }
+
+ // Get the number of clock ticks, used for cpu accounting.
+ static long ticks = sysconf(_SC_CLK_TCK);
+
+ PCHECK(ticks > 0) << "Failed to get sysconf(_SC_CLK_TCK)";
+
+ // Add the cpuacct.stat information.
+ Try<hashmap<string, uint64_t>> stat = cgroups::stat(
+ hierarchies["cpuacct"],
+ info->cgroup,
+ "cpuacct.stat");
+
+ if (stat.isError()) {
+ return Failure("Failed to read cpuacct.stat: " + stat.error());
+ }
+
+ // TODO(bmahler): Add namespacing to cgroups to enforce the expected
+ // structure, e.g., cgroups::cpuacct::stat.
+ Option<uint64_t> user = stat.get().get("user");
+ Option<uint64_t> system = stat.get().get("system");
+
+ if (user.isSome() && system.isSome()) {
+ result.set_cpus_user_time_secs((double) user.get() / (double) ticks);
+ result.set_cpus_system_time_secs((double) system.get() / (double) ticks);
+ }
+
+ // Add the cpu.stat information only if CFS is enabled.
+ if (flags.cgroups_enable_cfs) {
+ stat = cgroups::stat(hierarchies["cpu"], info->cgroup, "cpu.stat");
+ if (stat.isError()) {
+ return Failure("Failed to read cpu.stat: " + stat.error());
+ }
+
+ Option<uint64_t> nr_periods = stat.get().get("nr_periods");
+ if (nr_periods.isSome()) {
+ result.set_cpus_nr_periods(nr_periods.get());
+ }
+
+ Option<uint64_t> nr_throttled = stat.get().get("nr_throttled");
+ if (nr_throttled.isSome()) {
+ result.set_cpus_nr_throttled(nr_throttled.get());
+ }
+
+ Option<uint64_t> throttled_time = stat.get().get("throttled_time");
+ if (throttled_time.isSome()) {
+ result.set_cpus_throttled_time_secs(
+ Nanoseconds(throttled_time.get()).secs());
+ }
+ }
+
+ return result;
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::cleanup(
+ const ContainerID& containerId)
+{
+ // Multiple calls may occur during test clean up.
+ if (!infos.contains(containerId)) {
+ VLOG(1) << "Ignoring cleanup request for unknown container: "
+ << containerId;
+
+ return Nothing();
+ }
+
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+
+ list<Future<Nothing>> futures;
+ foreach (const string& subsystem, subsystems) {
+ futures.push_back(cgroups::destroy(
+ hierarchies[subsystem],
+ info->cgroup,
+ cgroups::DESTROY_TIMEOUT));
+ }
+
+ return collect(futures)
+ .onAny(defer(PID<CgroupsCpushareIsolatorProcess>(this),
+ &CgroupsCpushareIsolatorProcess::_cleanup,
+ containerId,
+ lambda::_1))
+ .then([]() { return Nothing(); });
+}
+
+
+Future<list<Nothing>> CgroupsCpushareIsolatorProcess::_cleanup(
+ const ContainerID& containerId,
+ const Future<list<Nothing>>& future)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ CHECK_NOTNULL(infos[containerId]);
+
+ if (!future.isReady()) {
+ return Failure(
+ "Failed to clean up container " + stringify(containerId) +
+ " : " + (future.isFailed() ? future.failure() : "discarded"));
+ }
+
+ delete infos[containerId];
+ infos.erase(containerId);
+
+ return future;
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
new file mode 100644
index 0000000..4ce37bf
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CPUSHARE_ISOLATOR_HPP__
+#define __CPUSHARE_ISOLATOR_HPP__
+
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include <process/future.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+
+#include "slave/flags.hpp"
+
+#include "slave/containerizer/isolator.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Use the Linux cpu cgroup controller for cpu isolation which uses the
+// Completely Fair Scheduler (CFS).
+// - cpushare implements proportionally weighted scheduling.
+// - cfs implements hard quota based scheduling.
+class CgroupsCpushareIsolatorProcess : public MesosIsolatorProcess
+{
+public:
+ static Try<mesos::slave::Isolator*> create(const Flags& flags);
+
+ virtual ~CgroupsCpushareIsolatorProcess();
+
+ virtual process::Future<Nothing> recover(
+ const std::list<mesos::slave::ContainerState>& states,
+ const hashset<ContainerID>& orphans);
+
+ virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user);
+
+ virtual process::Future<Nothing> isolate(
+ const ContainerID& containerId,
+ pid_t pid);
+
+ virtual process::Future<mesos::slave::ContainerLimitation> watch(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> cleanup(
+ const ContainerID& containerId);
+
+private:
+ CgroupsCpushareIsolatorProcess(
+ const Flags& flags,
+ const hashmap<std::string, std::string>& hierarchies,
+ const std::vector<std::string>& subsystems);
+
+ virtual process::Future<std::list<Nothing>> _cleanup(
+ const ContainerID& containerId,
+ const process::Future<std::list<Nothing>>& future);
+
+ struct Info
+ {
+ Info(const ContainerID& _containerId, const std::string& _cgroup)
+ : containerId(_containerId), cgroup(_cgroup) {}
+
+ const ContainerID containerId;
+ const std::string cgroup;
+ Option<pid_t> pid;
+ Option<Resources> resources;
+
+ process::Promise<mesos::slave::ContainerLimitation> limitation;
+ };
+
+ const Flags flags;
+
+ // Map from subsystem to hierarchy.
+ hashmap<std::string, std::string> hierarchies;
+
+ // Subsystems used for this isolator. Typically, there are two
+ // elements in the vector: 'cpu' and 'cpuacct'. If cpu and cpuacct
+ // systems are co-mounted (e.g., systems using systemd), then there
+ // will be only one element in the vector which is 'cpu,cpuacct'.
+ std::vector<std::string> subsystems;
+
+ // TODO(bmahler): Use Owned<Info>.
+ hashmap<ContainerID, Info*> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CPUSHARE_ISOLATOR_HPP__