You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:33 UTC

[09/12] mesos git commit: Relocated MesosContainerizer specific files to the correct location.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp
deleted file mode 100644
index ae53c1b..0000000
--- a/src/slave/containerizer/isolators/network/port_mapping.hpp
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __PORT_MAPPING_ISOLATOR_HPP__
-#define __PORT_MAPPING_ISOLATOR_HPP__
-
-#include <stdint.h>
-
-#include <sys/types.h>
-
-#include <set>
-#include <string>
-#include <vector>
-
-#include <process/owned.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/metrics/metrics.hpp>
-#include <process/metrics/counter.hpp>
-
-#include <stout/bytes.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/ip.hpp>
-#include <stout/interval.hpp>
-#include <stout/mac.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
-#include <stout/subcommand.hpp>
-
-#include "linux/routing/filter/ip.hpp"
-
-#include "slave/flags.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// The prefix this isolator uses for the virtual ethernet devices.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_VETH_PREFIX() { return "mesos"; }
-
-
-// The root directory where we bind mount all the namespace handles.
-// We choose the directory '/var/run/netns' so that we can use
-// iproute2 suite (e.g., ip netns show/exec) to inspect or enter the
-// network namespace. This is very useful for debugging purposes.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_BIND_MOUNT_ROOT() { return "/var/run/netns"; }
-
-// The root directory where we keep all the namespace handle
-// symlinks. This is introduced in 0.23.0.
-// NOTE: This constant is exposed for testing.
-inline std::string PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()
-{
-  return "/var/run/mesos/netns";
-}
-
-
-// These names are used to identify the traffic control statistics
-// output for each of the Linux Traffic Control Qdiscs we report.
-constexpr char NET_ISOLATOR_BW_LIMIT[] = "bw_limit";
-constexpr char NET_ISOLATOR_BLOAT_REDUCTION[] = "bloat_reduction";
-
-
-// Responsible for allocating ephemeral ports for the port mapping
-// network isolator. This class is exposed mainly for unit testing.
-class EphemeralPortsAllocator
-{
-public:
-  EphemeralPortsAllocator(
-      const IntervalSet<uint16_t>& total,
-      size_t _portsPerContainer)
-    : free(total),
-      portsPerContainer_(_portsPerContainer) {};
-
-  // Returns the number of ephemeral ports for each container.
-  size_t portsPerContainer() const { return portsPerContainer_; }
-
-  // Allocate an ephemeral port range for a container. The allocator
-  // will automatically find one port range with the given container
-  // size. Returns error if the allocation cannot be fulfilled (e.g.,
-  // exhausting available ephemeral ports).
-  Try<Interval<uint16_t>> allocate();
-
-  // Mark the specified ephemeral port range as allocated.
-  void allocate(const Interval<uint16_t>& ports);
-
-  // Deallocate the specified ephemeral port range.
-  void deallocate(const Interval<uint16_t>& ports);
-
-  // Return true if the specified ephemeral port range is managed by
-  // the allocator, regardless it has been allocated to use or not.
-  bool isManaged(const Interval<uint16_t>& ports)
-  {
-    return (free + used).contains(ports);
-  }
-
-private:
-  // Given an integer x, return the smallest integer t such that t >=
-  // x and t % m == 0.
-  static uint32_t nextMultipleOf(uint32_t x, uint32_t m);
-
-  IntervalSet<uint16_t> free;
-  IntervalSet<uint16_t> used;
-
-  // The number of ephemeral ports for each container.
-  size_t portsPerContainer_;
-};
-
-
-// For the specified ports, generate a set of port ranges each of
-// which can be used by a single IP filter. In other words, each port
-// range needs to satisfy the following two conditions: 1) the size of
-// the range is 2^n (n=0,1,2...); 2) the begin of the range is size
-// aligned (i.e., begin % size == 0). This function is exposed mainly
-// for unit testing.
-std::vector<routing::filter::ip::PortRange> getPortRanges(
-    const IntervalSet<uint16_t>& ports);
-
-
-// Provides network isolation using port mapping. Each container is
-// assigned a fixed set of ports (including ephemeral ports). The
-// isolator will set up filters on the host such that network traffic
-// to the host will be properly redirected to the corresponding
-// container depending on the destination ports. The network traffic
-// from containers will also be properly relayed to the host. This
-// isolator is useful when the operator wants to reuse the host IP for
-// all containers running on the host (e.g., there are insufficient
-// IPs).
-class PortMappingIsolatorProcess : public MesosIsolatorProcess
-{
-public:
-  static Try<mesos::slave::Isolator*> create(const Flags& flags);
-
-  virtual ~PortMappingIsolatorProcess() {}
-
-  virtual process::Future<Nothing> recover(
-      const std::list<mesos::slave::ContainerState>& states,
-      const hashset<ContainerID>& orphans);
-
-  virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  virtual process::Future<Nothing> isolate(
-      const ContainerID& containerId,
-      pid_t pid);
-
-  virtual process::Future<mesos::slave::ContainerLimitation> watch(
-      const ContainerID& containerId);
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  virtual process::Future<Nothing> cleanup(
-      const ContainerID& containerId);
-
-private:
-  struct Info
-  {
-    Info(const IntervalSet<uint16_t>& _nonEphemeralPorts,
-         const Interval<uint16_t>& _ephemeralPorts,
-         const Option<pid_t>& _pid = None())
-      : nonEphemeralPorts(_nonEphemeralPorts),
-        ephemeralPorts(_ephemeralPorts),
-        pid(_pid) {}
-
-    // Non-ephemeral ports used by the container. It's possible that a
-    // container does not use any non-ephemeral ports. In that case,
-    // 'nonEphemeralPorts' will be empty. This variable could change
-    // upon 'update'.
-    IntervalSet<uint16_t> nonEphemeralPorts;
-
-    // Each container has one and only one range of ephemeral ports.
-    // It cannot have more than one ranges of ephemeral ports because
-    // we need to setup the ip_local_port_range (which only accepts a
-    // single interval) inside the container to restrict the ephemeral
-    // ports used by the container.
-    const Interval<uint16_t> ephemeralPorts;
-
-    Option<pid_t> pid;
-    Option<uint16_t> flowId;
-  };
-
-  // Define the metrics used by the port mapping network isolator.
-  struct Metrics
-  {
-    Metrics();
-    ~Metrics();
-
-    process::metrics::Counter adding_eth0_ip_filters_errors;
-    process::metrics::Counter adding_eth0_ip_filters_already_exist;
-    process::metrics::Counter adding_eth0_egress_filters_errors;
-    process::metrics::Counter adding_eth0_egress_filters_already_exist;
-    process::metrics::Counter adding_lo_ip_filters_errors;
-    process::metrics::Counter adding_lo_ip_filters_already_exist;
-    process::metrics::Counter adding_veth_ip_filters_errors;
-    process::metrics::Counter adding_veth_ip_filters_already_exist;
-    process::metrics::Counter adding_veth_icmp_filters_errors;
-    process::metrics::Counter adding_veth_icmp_filters_already_exist;
-    process::metrics::Counter adding_veth_arp_filters_errors;
-    process::metrics::Counter adding_veth_arp_filters_already_exist;
-    process::metrics::Counter adding_eth0_icmp_filters_errors;
-    process::metrics::Counter adding_eth0_icmp_filters_already_exist;
-    process::metrics::Counter adding_eth0_arp_filters_errors;
-    process::metrics::Counter adding_eth0_arp_filters_already_exist;
-    process::metrics::Counter removing_eth0_ip_filters_errors;
-    process::metrics::Counter removing_eth0_ip_filters_do_not_exist;
-    process::metrics::Counter removing_eth0_egress_filters_errors;
-    process::metrics::Counter removing_eth0_egress_filters_do_not_exist;
-    process::metrics::Counter removing_lo_ip_filters_errors;
-    process::metrics::Counter removing_lo_ip_filters_do_not_exist;
-    process::metrics::Counter removing_veth_ip_filters_errors;
-    process::metrics::Counter removing_veth_ip_filters_do_not_exist;
-    process::metrics::Counter removing_eth0_icmp_filters_errors;
-    process::metrics::Counter removing_eth0_icmp_filters_do_not_exist;
-    process::metrics::Counter removing_eth0_arp_filters_errors;
-    process::metrics::Counter removing_eth0_arp_filters_do_not_exist;
-    process::metrics::Counter updating_eth0_icmp_filters_errors;
-    process::metrics::Counter updating_eth0_icmp_filters_already_exist;
-    process::metrics::Counter updating_eth0_icmp_filters_do_not_exist;
-    process::metrics::Counter updating_eth0_arp_filters_errors;
-    process::metrics::Counter updating_eth0_arp_filters_already_exist;
-    process::metrics::Counter updating_eth0_arp_filters_do_not_exist;
-    process::metrics::Counter updating_container_ip_filters_errors;
-  } metrics;
-
-  PortMappingIsolatorProcess(
-      const Flags& _flags,
-      const std::string& _eth0,
-      const std::string& _lo,
-      const net::MAC& _hostMAC,
-      const net::IPNetwork& _hostIPNetwork,
-      const size_t _hostEth0MTU,
-      const net::IP& _hostDefaultGateway,
-      const hashmap<std::string, std::string>& _hostNetworkConfigurations,
-      const Option<Bytes>& _egressRateLimitPerContainer,
-      const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
-      const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
-      const std::set<uint16_t>& _flowIDs)
-    : flags(_flags),
-      eth0(_eth0),
-      lo(_lo),
-      hostMAC(_hostMAC),
-      hostIPNetwork(_hostIPNetwork),
-      hostEth0MTU(_hostEth0MTU),
-      hostDefaultGateway(_hostDefaultGateway),
-      hostNetworkConfigurations(_hostNetworkConfigurations),
-      egressRateLimitPerContainer(_egressRateLimitPerContainer),
-      managedNonEphemeralPorts(_managedNonEphemeralPorts),
-      ephemeralPortsAllocator(_ephemeralPortsAllocator),
-      freeFlowIds(_flowIDs) {}
-
-  // Continuations.
-  Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
-  Try<Info*> _recover(pid_t pid);
-
-  void _update(
-      const ContainerID& containerId,
-      const process::Future<Option<int>>& status);
-
-  process::Future<ResourceStatistics> _usage(
-      const ResourceStatistics& result,
-      const process::Subprocess& s);
-
-  process::Future<ResourceStatistics> __usage(
-      ResourceStatistics result,
-      const process::Future<std::string>& out);
-
-  // Helper functions.
-  Try<Nothing> addHostIPFilters(
-      const routing::filter::ip::PortRange& range,
-      const Option<uint16_t>& flowId,
-      const std::string& veth);
-
-  Try<Nothing> removeHostIPFilters(
-      const routing::filter::ip::PortRange& range,
-      const std::string& veth,
-      bool removeFiltersOnVeth = true);
-
-  // Return the scripts that will be executed in the child context.
-  std::string scripts(Info* info);
-
-  uint16_t getNextFlowId();
-
-  const Flags flags;
-
-  const std::string eth0;
-  const std::string lo;
-  const net::MAC hostMAC;
-  const net::IPNetwork hostIPNetwork;
-  const size_t hostEth0MTU;
-  const net::IP hostDefaultGateway;
-
-  // Describe the host network configurations. It is a map between
-  // configure proc files (e.g., /proc/sys/net/core/somaxconn) and
-  // values of the configure proc files.
-  const hashmap<std::string, std::string> hostNetworkConfigurations;
-
-  // The optional throughput limit to containers' egress traffic.
-  const Option<Bytes> egressRateLimitPerContainer;
-
-  // All the non-ephemeral ports managed by the slave, as passed in
-  // via flags.resources.
-  const IntervalSet<uint16_t> managedNonEphemeralPorts;
-
-  process::Owned<EphemeralPortsAllocator> ephemeralPortsAllocator;
-
-  // Store a set of unused flow ID's on this slave.
-  std::set<uint16_t> freeFlowIds;
-
-  hashmap<ContainerID, Info*> infos;
-
-  // Recovered containers from a previous run that weren't managed by
-  // the network isolator.
-  hashset<ContainerID> unmanaged;
-};
-
-
-// Defines the subcommand for 'update' that needs to be executed by a
-// subprocess to update the filters inside a container.
-class PortMappingUpdate : public Subcommand
-{
-public:
-  static const char* NAME;
-
-  struct Flags : public flags::FlagsBase
-  {
-    Flags();
-
-    Option<std::string> eth0_name;
-    Option<std::string> lo_name;
-    Option<pid_t> pid;
-    Option<JSON::Object> ports_to_add;
-    Option<JSON::Object> ports_to_remove;
-  };
-
-  PortMappingUpdate() : Subcommand(NAME) {}
-
-  Flags flags;
-
-protected:
-  virtual int execute();
-  virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-
-// Defines the subcommand for 'statistics' that needs to be executed
-// by a subprocess to retrieve newtork statistics from inside a
-// container.
-class PortMappingStatistics : public Subcommand
-{
-public:
-  static const char* NAME;
-
-  struct Flags : public flags::FlagsBase
-  {
-    Flags();
-
-    Option<std::string> eth0_name;
-    Option<pid_t> pid;
-    bool enable_socket_statistics_summary;
-    bool enable_socket_statistics_details;
-  };
-
-  PortMappingStatistics() : Subcommand(NAME) {}
-
-  Flags flags;
-
-protected:
-  virtual int execute();
-  virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __PORT_MAPPING_ISOLATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix.hpp b/src/slave/containerizer/isolators/posix.hpp
deleted file mode 100644
index 00df902..0000000
--- a/src/slave/containerizer/isolators/posix.hpp
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __POSIX_ISOLATOR_HPP__
-#define __POSIX_ISOLATOR_HPP__
-
-#include <process/future.hpp>
-
-#include <stout/hashmap.hpp>
-#include <stout/os.hpp>
-
-#include <stout/os/pstree.hpp>
-
-#include "slave/flags.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-#include "usage/usage.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// A basic MesosIsolatorProcess that keeps track of the pid but
-// doesn't do any resource isolation. Subclasses must implement
-// usage() for their appropriate resource(s).
-class PosixIsolatorProcess : public MesosIsolatorProcess
-{
-public:
-  virtual process::Future<Nothing> recover(
-      const std::list<mesos::slave::ContainerState>& state,
-      const hashset<ContainerID>& orphans)
-  {
-    foreach (const mesos::slave::ContainerState& run, state) {
-      // This should (almost) never occur: see comment in
-      // PosixLauncher::recover().
-      if (pids.contains(run.container_id())) {
-        return process::Failure("Container already recovered");
-      }
-
-      pids.put(run.container_id(), run.pid());
-
-      process::Owned<process::Promise<mesos::slave::ContainerLimitation>>
-        promise(new process::Promise<mesos::slave::ContainerLimitation>());
-      promises.put(run.container_id(), promise);
-    }
-
-    return Nothing();
-  }
-
-  virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user)
-  {
-    if (promises.contains(containerId)) {
-      return process::Failure("Container " + stringify(containerId) +
-                              " has already been prepared");
-    }
-
-    process::Owned<process::Promise<mesos::slave::ContainerLimitation>> promise(
-        new process::Promise<mesos::slave::ContainerLimitation>());
-    promises.put(containerId, promise);
-
-    return None();
-  }
-
-  virtual process::Future<Nothing> isolate(
-      const ContainerID& containerId,
-      pid_t pid)
-  {
-    if (!promises.contains(containerId)) {
-      return process::Failure("Unknown container: " + stringify(containerId));
-    }
-
-    pids.put(containerId, pid);
-
-    return Nothing();
-  }
-
-  virtual process::Future<mesos::slave::ContainerLimitation> watch(
-      const ContainerID& containerId)
-  {
-    if (!promises.contains(containerId)) {
-      return process::Failure("Unknown container: " + stringify(containerId));
-    }
-
-    return promises[containerId]->future();
-  }
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources)
-  {
-    if (!promises.contains(containerId)) {
-      return process::Failure("Unknown container: " + stringify(containerId));
-    }
-
-    // No resources are actually isolated so nothing to do.
-    return Nothing();
-  }
-
-  virtual process::Future<Nothing> cleanup(const ContainerID& containerId)
-  {
-    if (!promises.contains(containerId)) {
-      return process::Failure("Unknown container: " + stringify(containerId));
-    }
-
-    // TODO(idownes): We should discard the container's promise here to signal
-    // to anyone that holds the future from watch().
-    promises.erase(containerId);
-
-    pids.erase(containerId);
-
-    return Nothing();
-  }
-
-protected:
-  hashmap<ContainerID, pid_t> pids;
-  hashmap<ContainerID,
-          process::Owned<process::Promise<mesos::slave::ContainerLimitation>>>
-    promises;
-};
-
-
-class PosixCpuIsolatorProcess : public PosixIsolatorProcess
-{
-public:
-  static Try<mesos::slave::Isolator*> create(const Flags& flags)
-  {
-    process::Owned<MesosIsolatorProcess> process(
-        new PosixCpuIsolatorProcess());
-
-    return new MesosIsolator(process);
-  }
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId)
-  {
-    if (!pids.contains(containerId)) {
-      LOG(WARNING) << "No resource usage for unknown container '"
-                   << containerId << "'";
-      return ResourceStatistics();
-    }
-
-    // Use 'mesos-usage' but only request 'cpus_' values.
-    Try<ResourceStatistics> usage =
-      mesos::internal::usage(pids.get(containerId).get(), false, true);
-    if (usage.isError()) {
-      return process::Failure(usage.error());
-    }
-    return usage.get();
-  }
-
-private:
-  PosixCpuIsolatorProcess() {}
-};
-
-
-class PosixMemIsolatorProcess : public PosixIsolatorProcess
-{
-public:
-  static Try<mesos::slave::Isolator*> create(const Flags& flags)
-  {
-    process::Owned<MesosIsolatorProcess> process(
-        new PosixMemIsolatorProcess());
-
-    return new MesosIsolator(process);
-  }
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId)
-  {
-    if (!pids.contains(containerId)) {
-      LOG(WARNING) << "No resource usage for unknown container '"
-                   << containerId << "'";
-      return ResourceStatistics();
-    }
-
-    // Use 'mesos-usage' but only request 'mem_' values.
-    Try<ResourceStatistics> usage =
-      mesos::internal::usage(pids.get(containerId).get(), true, false);
-    if (usage.isError()) {
-      return process::Failure(usage.error());
-    }
-    return usage.get();
-  }
-
-private:
-  PosixMemIsolatorProcess() {}
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __POSIX_ISOLATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix/disk.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix/disk.cpp b/src/slave/containerizer/isolators/posix/disk.cpp
deleted file mode 100644
index 73e62a2..0000000
--- a/src/slave/containerizer/isolators/posix/disk.cpp
+++ /dev/null
@@ -1,525 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <signal.h>
-
-#ifdef __linux__
-#include <sys/prctl.h>
-#endif
-#include <sys/types.h>
-
-#include <deque>
-#include <tuple>
-
-#include <glog/logging.h>
-
-#include <process/check.hpp>
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/delay.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/numify.hpp>
-#include <stout/strings.hpp>
-
-#include <stout/os/exists.hpp>
-#include <stout/os/killtree.hpp>
-
-#include "common/protobuf_utils.hpp"
-
-#include "slave/containerizer/isolators/posix/disk.hpp"
-
-using namespace process;
-
-using std::deque;
-using std::list;
-using std::string;
-using std::vector;
-
-using mesos::slave::ContainerLimitation;
-using mesos::slave::ContainerPrepareInfo;
-using mesos::slave::ContainerState;
-using mesos::slave::Isolator;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-Try<Isolator*> PosixDiskIsolatorProcess::create(const Flags& flags)
-{
-  // TODO(jieyu): Check the availability of command 'du'.
-
-  return new MesosIsolator(process::Owned<MesosIsolatorProcess>(
-        new PosixDiskIsolatorProcess(flags)));
-}
-
-
-PosixDiskIsolatorProcess::Info::PathInfo::~PathInfo()
-{
-  usage.discard();
-}
-
-
-PosixDiskIsolatorProcess::PosixDiskIsolatorProcess(const Flags& _flags)
-  : flags(_flags), collector(flags.container_disk_watch_interval) {}
-
-
-PosixDiskIsolatorProcess::~PosixDiskIsolatorProcess() {}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::recover(
-    const list<ContainerState>& states,
-    const hashset<ContainerID>& orphans)
-{
-  foreach (const ContainerState& state, states) {
-    // Since we checkpoint the executor after we create its working
-    // directory, the working directory should definitely exist.
-    CHECK(os::exists(state.directory()))
-      << "Executor work directory " << state.directory() << " doesn't exist";
-
-    infos.put(state.container_id(), Owned<Info>(new Info(state.directory())));
-  }
-
-  return Nothing();
-}
-
-
-Future<Option<ContainerPrepareInfo>> PosixDiskIsolatorProcess::prepare(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  if (infos.contains(containerId)) {
-    return Failure("Container has already been prepared");
-  }
-
-  infos.put(containerId, Owned<Info>(new Info(directory)));
-
-  return None();
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::isolate(
-    const ContainerID& containerId,
-    pid_t pid)
-{
-  if (!infos.contains(containerId)) {
-    return Failure("Unknown container");
-  }
-
-  return Nothing();
-}
-
-
-Future<ContainerLimitation> PosixDiskIsolatorProcess::watch(
-    const ContainerID& containerId)
-{
-  if (!infos.contains(containerId)) {
-    return Failure("Unknown container");
-  }
-
-  return infos[containerId]->limitation.future();
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::update(
-    const ContainerID& containerId,
-    const Resources& resources)
-{
-  if (!infos.contains(containerId)) {
-    LOG(WARNING) << "Ignoring update for unknown container " << containerId;
-    return Nothing();
-  }
-
-  LOG(INFO) << "Updating the disk resources for container "
-            << containerId << " to " << resources;
-
-  const Owned<Info>& info = infos[containerId];
-
-  // This stores the updated quotas.
-  hashmap<string, Resources> quotas;
-
-  foreach (const Resource& resource, resources) {
-    if (resource.name() != "disk") {
-      continue;
-    }
-
-    // The path at which we will collect disk usage and enforce quota.
-    string path;
-
-    // NOTE: We do not allow the case where has_disk() is true but
-    // with nothing set inside DiskInfo. The master will enforce it.
-    if (!resource.has_disk()) {
-      // Regular disk used for executor working directory.
-      path = info->directory;
-    } else {
-      // TODO(jieyu): Support persistent volmes as well.
-      LOG(ERROR) << "Enforcing disk quota unsupported for " << resource;
-      continue;
-    }
-
-    quotas[path] += resource;
-  }
-
-  // Update the quota for paths. For each new path, we also initiate
-  // the disk usage collection.
-  foreachpair (const string& path, const Resources& quota, quotas) {
-    if (!info->paths.contains(path)) {
-      info->paths[path].usage = collector.usage(path)
-        .onAny(defer(
-            PID<PosixDiskIsolatorProcess>(this),
-            &PosixDiskIsolatorProcess::_collect,
-            containerId,
-            path,
-            lambda::_1));
-    }
-
-    info->paths[path].quota = quota;
-  }
-
-  // Remove paths that we no longer interested in.
-  foreach (const string& path, info->paths.keys()) {
-    if (!quotas.contains(path)) {
-      info->paths.erase(path);
-    }
-  }
-
-  return Nothing();
-}
-
-
-void PosixDiskIsolatorProcess::_collect(
-    const ContainerID& containerId,
-    const string& path,
-    const Future<Bytes>& future)
-{
-  if (future.isDiscarded()) {
-    LOG(INFO) << "Checking disk usage at '" << path << "' for container "
-              << containerId << " has been cancelled";
-  } else if (future.isFailed()) {
-    LOG(ERROR) << "Checking disk usage at '" << path << "' for container "
-               << containerId << " has failed: " << future.failure();
-  }
-
-  if (!infos.contains(containerId)) {
-    // The container might have just been destroyed.
-    return;
-  }
-
-  const Owned<Info>& info = infos[containerId];
-
-  if (!info->paths.contains(path)) {
-    // The path might have just been removed from this container's
-    // resources.
-    return;
-  }
-
-  // Check if the disk usage exceeds the quota. If yes, report the
-  // limitation. We keep collecting the disk usage for 'path' by
-  // initiating another round of disk usage check. The check will be
-  // throttled by DiskUsageCollector.
-  if (future.isReady()) {
-    // Save the last disk usage.
-    info->paths[path].lastUsage = future.get();
-
-    if (flags.enforce_container_disk_quota) {
-      Option<Bytes> quota = info->paths[path].quota.disk();
-      CHECK_SOME(quota);
-
-      if (future.get() > quota.get()) {
-        info->limitation.set(
-            protobuf::slave::createContainerLimitation(
-                Resources(info->paths[path].quota),
-                "Disk usage (" + stringify(future.get()) +
-                ") exceeds quota (" + stringify(quota.get()) + ")",
-                TaskStatus::REASON_CONTAINER_LIMITATION_DISK));
-      }
-    }
-  }
-
-  info->paths[path].usage = collector.usage(path)
-    .onAny(defer(
-        PID<PosixDiskIsolatorProcess>(this),
-        &PosixDiskIsolatorProcess::_collect,
-        containerId,
-        path,
-        lambda::_1));
-}
-
-
-Future<ResourceStatistics> PosixDiskIsolatorProcess::usage(
-    const ContainerID& containerId)
-{
-  if (!infos.contains(containerId)) {
-    return Failure("Unknown container");
-  }
-
-  ResourceStatistics result;
-
-  const Owned<Info>& info = infos[containerId];
-
-  if (info->paths.contains(info->directory)) {
-    Option<Bytes> quota = info->paths[info->directory].quota.disk();
-    CHECK_SOME(quota);
-
-    result.set_disk_limit_bytes(quota.get().bytes());
-
-    // NOTE: There may be a large delay (# of containers * interval)
-    // until an initial cached value is returned here!
-    if (info->paths[info->directory].lastUsage.isSome()) {
-      result.set_disk_used_bytes(
-          info->paths[info->directory].lastUsage.get().bytes());
-    }
-  }
-
-  return result;
-}
-
-
-Future<Nothing> PosixDiskIsolatorProcess::cleanup(
-    const ContainerID& containerId)
-{
-  if (!infos.contains(containerId)) {
-    LOG(WARNING) << "Ignoring cleanup for unknown container " << containerId;
-    return Nothing();
-  }
-
-  infos.erase(containerId);
-
-  return Nothing();
-}
-
-
-class DiskUsageCollectorProcess : public Process<DiskUsageCollectorProcess>
-{
-public:
-  DiskUsageCollectorProcess(const Duration& _interval) : interval(_interval) {}
-  virtual ~DiskUsageCollectorProcess() {}
-
-  Future<Bytes> usage(const string& path)
-  {
-    foreach (const Owned<Entry>& entry, entries) {
-      if (entry->path == path) {
-        return entry->promise.future();
-      }
-    }
-
-    entries.push_back(Owned<Entry>(new Entry(path)));
-
-    // Install onDiscard callback.
-    Future<Bytes> future = entries.back()->promise.future();
-    future.onDiscard(defer(self(), &Self::discard, path));
-
-    return future;
-  }
-
-protected:
-  void initialize()
-  {
-    schedule();
-  }
-
-  void finalize()
-  {
-    foreach (const Owned<Entry>& entry, entries) {
-      if (entry->du.isSome() && entry->du.get().status().isPending()) {
-        os::killtree(entry->du.get().pid(), SIGKILL);
-      }
-
-      entry->promise.fail("DiskUsageCollector is destroyed");
-    }
-  }
-
-private:
-  // Describe a single pending check.
-  struct Entry
-  {
-    explicit Entry(const string& _path) : path(_path) {}
-
-    string path;
-    Option<Subprocess> du;
-    Promise<Bytes> promise;
-  };
-
-  // This function is invoked right before each 'du' is exec'ed. Note
-  // that this function needs to be async signal safe.
-  static int setupChild()
-  {
-#ifdef __linux__
-    // Kill the child process if the parent exits.
-    // NOTE: This function should never returns non-zero because we
-    // are passing in a valid signal.
-    return ::prctl(PR_SET_PDEATHSIG, SIGKILL);
-#else
-    return 0;
-#endif
-  }
-
-  void discard(const string& path)
-  {
-    for (auto it = entries.begin(); it != entries.end(); ++it) {
-      // We only cancel those checks whose 'du' haven't been launched.
-      if ((*it)->path == path && (*it)->du.isNone()) {
-        (*it)->promise.discard();
-        entries.erase(it);
-        break;
-      }
-    }
-  }
-
-  // Schedule a 'du' to be invoked. The current implementation does
-  // not allow multiple 'du's running concurrently. The minimal
-  // interval between two subsequent 'du's is controlled by 'interval'
-  // for throttling purpose.
-  void schedule()
-  {
-    if (entries.empty()) {
-      delay(interval, self(), &Self::schedule);
-      return;
-    }
-
-    const Owned<Entry>& entry = entries.front();
-
-    // Invoke 'du' and report number of 1K-byte blocks. We fix the
-    // block size here so that we can get consistent results on all
-    // platforms (e.g., OS X uses 512 byte blocks).
-    //
-    // NOTE: The 'du' processes are run in the slave's cgroup and it
-    // will be that cgroup that is charged for (a) memory to cache the
-    // fs data structures, (b) disk I/O to read those structures, and
-    // (c) the cpu time to traverse.
-    Try<Subprocess> s = subprocess(
-        "du",
-        vector<string>({"du", "-k", "-s", entry->path}),
-        Subprocess::PATH("/dev/null"),
-        Subprocess::PIPE(),
-        Subprocess::PIPE(),
-        None(),
-        None(),
-        setupChild);
-
-    if (s.isError()) {
-      entry->promise.fail("Failed to exec 'du': " + s.error());
-
-      entries.pop_front();
-      delay(interval, self(), &Self::schedule);
-      return;
-    }
-
-    entry->du = s.get();
-
-    await(s.get().status(),
-          io::read(s.get().out().get()),
-          io::read(s.get().err().get()))
-      .onAny(defer(self(), &Self::_schedule, lambda::_1));
-  }
-
-  void _schedule(const Future<std::tuple<
-      Future<Option<int>>,
-      Future<string>,
-      Future<string>>>& future)
-  {
-    CHECK_READY(future);
-    CHECK(!entries.empty());
-
-    const Owned<Entry>& entry = entries.front();
-    CHECK_SOME(entry->du);
-
-    Future<Option<int>> status = std::get<0>(future.get());
-
-    if (!status.isReady()) {
-      entry->promise.fail(
-          "Failed to perform 'du': " +
-          (status.isFailed() ? status.failure() : "discarded"));
-    } else if (status.get().isNone()) {
-      entry->promise.fail("Failed to reap the status of 'du'");
-    } else if (status.get().get() != 0) {
-      Future<string> error = std::get<2>(future.get());
-      if (!error.isReady()) {
-        entry->promise.fail(
-            "Failed to perform 'du'. Reading stderr failed: " +
-            (error.isFailed() ? error.failure() : "discarded"));
-      } else {
-        entry->promise.fail("Failed to perform 'du': " + error.get());
-      }
-    } else {
-      Future<string> output = std::get<1>(future.get());
-      if (!output.isReady()) {
-        entry->promise.fail(
-            "Failed to read stdout from 'du': " +
-            (output.isFailed() ? output.failure() : "discarded"));
-      } else {
-        // Parsing the output from 'du'. The following is a sample
-        // output. Tab is used as the delimiter between the number of
-        // blocks and the checked path.
-        // $ du /var/lib/mesos/.../runs/container_id
-        // 1024   /var/lib/mesos/.../runs/container_id
-        vector<string> tokens = strings::tokenize(output.get(), " \t");
-        if (tokens.empty()) {
-          entry->promise.fail("Unexpected output from 'du': " + output.get());
-        } else {
-          Try<size_t> value = numify<size_t>(tokens[0]);
-          if (value.isError()) {
-            entry->promise.fail("Unexpected output from 'du': " + output.get());
-          } else {
-            // Notify the callers.
-            entry->promise.set(Kilobytes(value.get()));
-          }
-        }
-      }
-    }
-
-    entries.pop_front();
-    delay(interval, self(), &Self::schedule);
-  }
-
-  const Duration interval;
-
-  // A queue of pending checks.
-  deque<Owned<Entry>> entries;
-};
-
-
-DiskUsageCollector::DiskUsageCollector(const Duration& interval)
-{
-  process = new DiskUsageCollectorProcess(interval);
-  spawn(process);
-}
-
-
-DiskUsageCollector::~DiskUsageCollector()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<Bytes> DiskUsageCollector::usage(const string& path)
-{
-  return dispatch(process, &DiskUsageCollectorProcess::usage, path);
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/isolators/posix/disk.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix/disk.hpp b/src/slave/containerizer/isolators/posix/disk.hpp
deleted file mode 100644
index 85df5d2..0000000
--- a/src/slave/containerizer/isolators/posix/disk.hpp
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __POSIX_DISK_ISOLATOR_HPP__
-#define __POSIX_DISK_ISOLATOR_HPP__
-
-#include <string>
-
-#include <process/owned.hpp>
-
-#include <stout/bytes.hpp>
-#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
-
-#include "slave/flags.hpp"
-#include "slave/state.hpp"
-
-#include "slave/containerizer/isolator.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declarations.
-class DiskUsageCollectorProcess;
-
-
-// Responsible for collecting disk usage for paths, while ensuring
-// that an interval elapses between each collection.
-class DiskUsageCollector
-{
-public:
-  DiskUsageCollector(const Duration& interval);
-  ~DiskUsageCollector();
-
-  // Returns the disk usage rooted at 'path'. The user can discard the
-  // returned future to cancel the check.
-  process::Future<Bytes> usage(const std::string& path);
-
-private:
-  DiskUsageCollectorProcess* process;
-};
-
-
-// This isolator monitors the disk usage for containers, and reports
-// ContainerLimitation when a container exceeds its disk quota. This
-// leverages the DiskUsageCollector to ensure that we don't induce too
-// much CPU usage and disk caching effects from running 'du' too
-// often.
-//
-// NOTE: Currently all containers are processed in the same queue,
-// which means that when a container starts, it could take many disk
-// collection intervals until any data is available in the resource
-// usage statistics!
-//
-// TODO(jieyu): Consider handling each container independently, or
-// triggering an initial collection when the container starts, to
-// ensure that we have usage statistics without a large delay.
-class PosixDiskIsolatorProcess : public MesosIsolatorProcess
-{
-public:
-  static Try<mesos::slave::Isolator*> create(const Flags& flags);
-
-  virtual ~PosixDiskIsolatorProcess();
-
-  virtual process::Future<Nothing> recover(
-      const std::list<mesos::slave::ContainerState>& states,
-      const hashset<ContainerID>& orphans);
-
-  virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  virtual process::Future<Nothing> isolate(
-      const ContainerID& containerId,
-      pid_t pid);
-
-  virtual process::Future<mesos::slave::ContainerLimitation> watch(
-      const ContainerID& containerId);
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  virtual process::Future<Nothing> cleanup(
-      const ContainerID& containerId);
-
-private:
-  PosixDiskIsolatorProcess(const Flags& flags);
-
-  void _collect(
-      const ContainerID& containerId,
-      const std::string& path,
-      const process::Future<Bytes>& future);
-
-  const Flags flags;
-  DiskUsageCollector collector;
-
-  struct Info
-  {
-    explicit Info(const std::string& _directory) : directory(_directory) {}
-
-    // We save executor working directory here so that we know where
-    // to collect disk usage for disk resources without DiskInfo.
-    const std::string directory;
-
-    process::Promise<mesos::slave::ContainerLimitation> limitation;
-
-    // The keys of the hashmaps contain the executor working directory
-    // above, and optionally paths of volumes used by the container.
-    // For each path, we maintain its quota and its last usage.
-    struct PathInfo
-    {
-      ~PathInfo();
-
-      Resources quota;
-      process::Future<Bytes> usage;
-      Option<Bytes> lastUsage;
-    };
-
-    hashmap<std::string, PathInfo> paths;
-  };
-
-  hashmap<ContainerID, process::Owned<Info>> infos;
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __POSIX_DISK_ISOLATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index c03b89e..654f0e1 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -40,7 +40,7 @@
 
 #include "slave/containerizer/linux_launcher.hpp"
 
-#include "slave/containerizer/isolators/namespaces/pid.hpp"
+#include "slave/containerizer/mesos/isolators/namespaces/pid.hpp"
 
 using namespace process;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d1fc5a4..0664d00 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -47,36 +47,35 @@
 #include "slave/containerizer/linux_launcher.hpp"
 #endif
 
-#include "slave/containerizer/isolators/posix.hpp"
+#include "slave/containerizer/mesos/isolators/posix.hpp"
 
-#include "slave/containerizer/isolators/posix/disk.hpp"
+#include "slave/containerizer/mesos/isolators/posix/disk.hpp"
 
 #ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/perf_event.hpp"
 #endif
 
 #ifdef __linux__
-#include "slave/containerizer/isolators/filesystem/linux.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/linux.hpp"
 #endif
-#include "slave/containerizer/isolators/filesystem/posix.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/posix.hpp"
 #ifdef __linux__
-#include "slave/containerizer/isolators/filesystem/shared.hpp"
+#include "slave/containerizer/mesos/isolators/filesystem/shared.hpp"
 #endif
 
 #ifdef __linux__
-#include "slave/containerizer/isolators/namespaces/pid.hpp"
+#include "slave/containerizer/mesos/isolators/namespaces/pid.hpp"
 #endif
 
 #ifdef WITH_NETWORK_ISOLATOR
-#include "slave/containerizer/isolators/network/port_mapping.hpp"
+#include "slave/containerizer/mesos/isolators/network/port_mapping.hpp"
 #endif
 
 #include "slave/containerizer/mesos/containerizer.hpp"
 #include "slave/containerizer/mesos/launch.hpp"
-
-#include "slave/containerizer/provisioner/provisioner.hpp"
+#include "slave/containerizer/mesos/provisioner/provisioner.hpp"
 
 using std::list;
 using std::map;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
new file mode 100644
index 0000000..db9dde0
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CGROUPS_ISOLATOR_CONSTANTS_HPP__
+#define __CGROUPS_ISOLATOR_CONSTANTS_HPP__
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// CPU subsystem constants.
+const uint64_t CPU_SHARES_PER_CPU = 1024;
+const uint64_t CPU_SHARES_PER_CPU_REVOCABLE = 10;
+const uint64_t MIN_CPU_SHARES = 2; // Linux constant.
+const Duration CPU_CFS_PERIOD = Milliseconds(100); // Linux default.
+const Duration MIN_CPU_CFS_QUOTA = Milliseconds(1);
+
+
+// Memory subsystem constants.
+const Bytes MIN_MEMORY = Megabytes(32);
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CGROUPS_ISOLATOR_CONSTANTS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
new file mode 100644
index 0000000..84a64e5
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.cpp
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+
+#include <mesos/type_utils.hpp>
+#include <mesos/values.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/pid.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/check.hpp>
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
+
+using namespace process;
+
+using std::list;
+using std::set;
+using std::string;
+using std::vector;
+
+using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerPrepareInfo;
+using mesos::slave::ContainerState;
+using mesos::slave::Isolator;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+CgroupsCpushareIsolatorProcess::CgroupsCpushareIsolatorProcess(
+    const Flags& _flags,
+    const hashmap<string, string>& _hierarchies,
+    const vector<string>& _subsystems)
+  : flags(_flags),
+    hierarchies(_hierarchies),
+    subsystems(_subsystems) {}
+
+
+CgroupsCpushareIsolatorProcess::~CgroupsCpushareIsolatorProcess() {}
+
+
+Try<Isolator*> CgroupsCpushareIsolatorProcess::create(const Flags& flags)
+{
+  Try<string> hierarchyCpu = cgroups::prepare(
+        flags.cgroups_hierarchy,
+        "cpu",
+        flags.cgroups_root);
+
+  if (hierarchyCpu.isError()) {
+    return Error(
+        "Failed to prepare hierarchy for cpu subsystem: " +
+        hierarchyCpu.error());
+  }
+
+  Try<string> hierarchyCpuacct = cgroups::prepare(
+        flags.cgroups_hierarchy,
+        "cpuacct",
+        flags.cgroups_root);
+
+  if (hierarchyCpuacct.isError()) {
+    return Error(
+        "Failed to prepare hierarchy for cpuacct subsystem: " +
+        hierarchyCpuacct.error());
+  }
+
+  hashmap<string, string> hierarchies;
+  vector<string> subsystems;
+
+  hierarchies["cpu"] = hierarchyCpu.get();
+  hierarchies["cpuacct"] = hierarchyCpuacct.get();
+
+  if (hierarchyCpu.get() == hierarchyCpuacct.get()) {
+    // Subsystem cpu and cpuacct are co-mounted (e.g., systemd).
+    hierarchies["cpu,cpuacct"] = hierarchyCpu.get();
+    subsystems.push_back("cpu,cpuacct");
+
+    // Ensure that no other subsystem is attached to the hierarchy.
+    Try<set<string>> _subsystems = cgroups::subsystems(hierarchyCpu.get());
+    if (_subsystems.isError()) {
+      return Error(
+          "Failed to get the list of attached subsystems for hierarchy " +
+          hierarchyCpu.get());
+    } else if (_subsystems.get().size() != 2) {
+      return Error(
+          "Unexpected subsystems found attached to the hierarchy " +
+          hierarchyCpu.get());
+    }
+  } else {
+    // Subsystem cpu and cpuacct are mounted separately.
+    subsystems.push_back("cpu");
+    subsystems.push_back("cpuacct");
+
+    // Ensure that no other subsystem is attached to each of the
+    // hierarchy.
+    Try<set<string>> _subsystems = cgroups::subsystems(hierarchyCpu.get());
+    if (_subsystems.isError()) {
+      return Error(
+          "Failed to get the list of attached subsystems for hierarchy " +
+          hierarchyCpu.get());
+    } else if (_subsystems.get().size() != 1) {
+      return Error(
+          "Unexpected subsystems found attached to the hierarchy " +
+          hierarchyCpu.get());
+    }
+
+    _subsystems = cgroups::subsystems(hierarchyCpuacct.get());
+    if (_subsystems.isError()) {
+      return Error(
+          "Failed to get the list of attached subsystems for hierarchy " +
+          hierarchyCpuacct.get());
+    } else if (_subsystems.get().size() != 1) {
+      return Error(
+          "Unexpected subsystems found attached to the hierarchy " +
+          hierarchyCpuacct.get());
+    }
+  }
+
+  if (flags.cgroups_enable_cfs) {
+    Try<bool> exists = cgroups::exists(
+        hierarchies["cpu"],
+        flags.cgroups_root,
+        "cpu.cfs_quota_us");
+
+    if (exists.isError() || !exists.get()) {
+      return Error(
+          "Failed to find 'cpu.cfs_quota_us'. Your kernel "
+          "might be too old to use the CFS cgroups feature.");
+    }
+  }
+
+  process::Owned<MesosIsolatorProcess> process(
+      new CgroupsCpushareIsolatorProcess(flags, hierarchies, subsystems));
+
+  return new MesosIsolator(process);
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::recover(
+    const list<ContainerState>& states,
+    const hashset<ContainerID>& orphans)
+{
+  foreach (const ContainerState& state, states) {
+    const ContainerID& containerId = state.container_id();
+    const string cgroup = path::join(flags.cgroups_root, containerId.value());
+
+    Try<bool> exists = cgroups::exists(hierarchies["cpu"], cgroup);
+    if (exists.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+      return Failure(
+          "Failed to check cgroup for container " + stringify(containerId));
+    }
+
+    if (!exists.get()) {
+      // This may occur if the executor has exited and the isolator
+      // has destroyed the cgroup but the slave dies before noticing
+      // this. This will be detected when the containerizer tries to
+      // monitor the executor's pid.
+      LOG(WARNING) << "Couldn't find cgroup for container " << containerId;
+      continue;
+    }
+
+    infos[containerId] = new Info(containerId, cgroup);
+  }
+
+  // Remove orphan cgroups.
+  foreach (const string& subsystem, subsystems) {
+    Try<vector<string>> cgroups = cgroups::get(
+        hierarchies[subsystem],
+        flags.cgroups_root);
+
+    if (cgroups.isError()) {
+      foreachvalue (Info* info, infos) {
+        delete info;
+      }
+      infos.clear();
+      return Failure(cgroups.error());
+    }
+
+    foreach (const string& cgroup, cgroups.get()) {
+      // Ignore the slave cgroup (see the --slave_subsystems flag).
+      // TODO(idownes): Remove this when the cgroups layout is
+      // updated, see MESOS-1185.
+      if (cgroup == path::join(flags.cgroups_root, "slave")) {
+        continue;
+      }
+
+      ContainerID containerId;
+      containerId.set_value(Path(cgroup).basename());
+
+      if (infos.contains(containerId)) {
+        continue;
+      }
+
+      // Known orphan cgroups will be destroyed by the containerizer
+      // using the normal cleanup path. See MESOS-2367 for details.
+      if (orphans.contains(containerId)) {
+        infos[containerId] = new Info(containerId, cgroup);
+        continue;
+      }
+
+      LOG(INFO) << "Removing unknown orphaned cgroup '"
+                << path::join(subsystem, cgroup) << "'";
+
+      // We don't wait on the destroy as we don't want to block recovery.
+      cgroups::destroy(
+          hierarchies[subsystem],
+          cgroup,
+          cgroups::DESTROY_TIMEOUT);
+    }
+  }
+
+  return Nothing();
+}
+
+
+Future<Option<ContainerPrepareInfo>> CgroupsCpushareIsolatorProcess::prepare(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user)
+{
+  if (infos.contains(containerId)) {
+    return Failure("Container has already been prepared");
+  }
+
+  // TODO(bmahler): Don't insert into 'infos' unless we create the
+  // cgroup successfully. It's safe for now because 'cleanup' gets
+  // called if we return a Failure, but cleanup will fail because the
+  // cgroup does not exist when cgroups::destroy is called.
+  Info* info = new Info(
+      containerId, path::join(flags.cgroups_root, containerId.value()));
+
+  infos[containerId] = info;
+
+  foreach (const string& subsystem, subsystems) {
+    Try<bool> exists = cgroups::exists(hierarchies[subsystem], info->cgroup);
+    if (exists.isError()) {
+      return Failure("Failed to prepare isolator: " + exists.error());
+    } else if (exists.get()) {
+      return Failure("Failed to prepare isolator: cgroup already exists");
+    }
+
+    Try<Nothing> create = cgroups::create(hierarchies[subsystem], info->cgroup);
+    if (create.isError()) {
+      return Failure("Failed to prepare isolator: " + create.error());
+    }
+
+    // Chown the cgroup so the executor can create nested cgroups. Do
+    // not recurse so the control files are still owned by the slave
+    // user and thus cannot be changed by the executor.
+    if (user.isSome()) {
+      Try<Nothing> chown = os::chown(
+          user.get(),
+          path::join(hierarchies[subsystem], info->cgroup),
+          false);
+      if (chown.isError()) {
+        return Failure("Failed to prepare isolator: " + chown.error());
+      }
+    }
+  }
+
+  return update(containerId, executorInfo.resources())
+    .then([]() -> Future<Option<ContainerPrepareInfo>> {
+      return None();
+    });
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::isolate(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  CHECK_NONE(info->pid);
+  info->pid = pid;
+
+  foreach (const string& subsystem, subsystems) {
+    Try<Nothing> assign = cgroups::assign(
+        hierarchies[subsystem],
+        info->cgroup,
+        pid);
+
+    if (assign.isError()) {
+      LOG(ERROR) << "Failed to assign container '" << info->containerId
+                 << " to its own cgroup '"
+                 << path::join(hierarchies[subsystem], info->cgroup)
+                 << "' : " << assign.error();
+
+      return Failure("Failed to isolate container: " + assign.error());
+    }
+  }
+
+  return Nothing();
+}
+
+
+Future<ContainerLimitation> CgroupsCpushareIsolatorProcess::watch(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  CHECK_NOTNULL(infos[containerId]);
+
+  return infos[containerId]->limitation.future();
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  if (resources.cpus().isNone()) {
+    return Failure("No cpus resource given");
+  }
+
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  const Option<string>& hierarchy = hierarchies.get("cpu");
+  if (hierarchy.isNone()) {
+    return Failure("No 'cpu' hierarchy");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+  info->resources = resources;
+
+  double cpus = resources.cpus().get();
+
+  // Always set cpu.shares.
+  uint64_t shares;
+
+  if (flags.revocable_cpu_low_priority &&
+      resources.revocable().cpus().isSome()) {
+    shares = std::max(
+        (uint64_t) (CPU_SHARES_PER_CPU_REVOCABLE * cpus),
+        MIN_CPU_SHARES);
+  } else {
+    shares = std::max(
+        (uint64_t) (CPU_SHARES_PER_CPU * cpus),
+        MIN_CPU_SHARES);
+  }
+
+  Try<Nothing> write = cgroups::cpu::shares(
+      hierarchy.get(),
+      info->cgroup,
+      shares);
+
+  if (write.isError()) {
+    return Failure("Failed to update 'cpu.shares': " + write.error());
+  }
+
+  LOG(INFO) << "Updated 'cpu.shares' to " << shares
+            << " (cpus " << cpus << ")"
+            << " for container " << containerId;
+
+  // Set cfs quota if enabled.
+  if (flags.cgroups_enable_cfs) {
+    write = cgroups::cpu::cfs_period_us(
+        hierarchy.get(),
+        info->cgroup,
+        CPU_CFS_PERIOD);
+
+    if (write.isError()) {
+      return Failure("Failed to update 'cpu.cfs_period_us': " + write.error());
+    }
+
+    Duration quota = std::max(CPU_CFS_PERIOD * cpus, MIN_CPU_CFS_QUOTA);
+
+    write = cgroups::cpu::cfs_quota_us(hierarchy.get(), info->cgroup, quota);
+    if (write.isError()) {
+      return Failure("Failed to update 'cpu.cfs_quota_us': " + write.error());
+    }
+
+    LOG(INFO) << "Updated 'cpu.cfs_period_us' to " << CPU_CFS_PERIOD
+              << " and 'cpu.cfs_quota_us' to " << quota
+              << " (cpus " << cpus << ")"
+              << " for container " << containerId;
+  }
+
+  return Nothing();
+}
+
+
+Future<ResourceStatistics> CgroupsCpushareIsolatorProcess::usage(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  ResourceStatistics result;
+
+  // TODO(chzhcn): Getting the number of processes and threads is
+  // available as long as any cgroup subsystem is used so this best
+  // not be tied to a specific cgroup isolator. A better place is
+  // probably Linux Launcher, which uses the cgroup freezer subsystem.
+  // That requires some change for it to adopt the new semantics of
+  // reporting subsystem-independent cgroup usage.
+  // Note: The complexity of this operation is linear to the number of
+  // processes and threads in a container: the kernel has to allocate
+  // memory to contain the list of pids or tids; the userspace has to
+  // parse the cgroup files to get the size. If this proves to be a
+  // performance bottleneck, some kind of rate limiting mechanism
+  // needs to be employed.
+  if (flags.cgroups_cpu_enable_pids_and_tids_count) {
+    Try<std::set<pid_t>> pids =
+      cgroups::processes(hierarchies["cpuacct"], info->cgroup);
+    if (pids.isError()) {
+      return Failure("Failed to get number of processes: " + pids.error());
+    }
+
+    result.set_processes(pids.get().size());
+
+    Try<std::set<pid_t>> tids =
+      cgroups::threads(hierarchies["cpuacct"], info->cgroup);
+    if (tids.isError()) {
+      return Failure("Failed to get number of threads: " + tids.error());
+    }
+
+    result.set_threads(tids.get().size());
+  }
+
+  // Get the number of clock ticks, used for cpu accounting.
+  static long ticks = sysconf(_SC_CLK_TCK);
+
+  PCHECK(ticks > 0) << "Failed to get sysconf(_SC_CLK_TCK)";
+
+  // Add the cpuacct.stat information.
+  Try<hashmap<string, uint64_t>> stat = cgroups::stat(
+      hierarchies["cpuacct"],
+      info->cgroup,
+      "cpuacct.stat");
+
+  if (stat.isError()) {
+    return Failure("Failed to read cpuacct.stat: " + stat.error());
+  }
+
+  // TODO(bmahler): Add namespacing to cgroups to enforce the expected
+  // structure, e.g., cgroups::cpuacct::stat.
+  Option<uint64_t> user = stat.get().get("user");
+  Option<uint64_t> system = stat.get().get("system");
+
+  if (user.isSome() && system.isSome()) {
+    result.set_cpus_user_time_secs((double) user.get() / (double) ticks);
+    result.set_cpus_system_time_secs((double) system.get() / (double) ticks);
+  }
+
+  // Add the cpu.stat information only if CFS is enabled.
+  if (flags.cgroups_enable_cfs) {
+    stat = cgroups::stat(hierarchies["cpu"], info->cgroup, "cpu.stat");
+    if (stat.isError()) {
+      return Failure("Failed to read cpu.stat: " + stat.error());
+    }
+
+    Option<uint64_t> nr_periods = stat.get().get("nr_periods");
+    if (nr_periods.isSome()) {
+      result.set_cpus_nr_periods(nr_periods.get());
+    }
+
+    Option<uint64_t> nr_throttled = stat.get().get("nr_throttled");
+    if (nr_throttled.isSome()) {
+      result.set_cpus_nr_throttled(nr_throttled.get());
+    }
+
+    Option<uint64_t> throttled_time = stat.get().get("throttled_time");
+    if (throttled_time.isSome()) {
+      result.set_cpus_throttled_time_secs(
+          Nanoseconds(throttled_time.get()).secs());
+    }
+  }
+
+  return result;
+}
+
+
+Future<Nothing> CgroupsCpushareIsolatorProcess::cleanup(
+    const ContainerID& containerId)
+{
+  // Multiple calls may occur during test clean up.
+  if (!infos.contains(containerId)) {
+    VLOG(1) << "Ignoring cleanup request for unknown container: "
+            << containerId;
+
+    return Nothing();
+  }
+
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  list<Future<Nothing>> futures;
+  foreach (const string& subsystem, subsystems) {
+    futures.push_back(cgroups::destroy(
+        hierarchies[subsystem],
+        info->cgroup,
+        cgroups::DESTROY_TIMEOUT));
+  }
+
+  return collect(futures)
+    .onAny(defer(PID<CgroupsCpushareIsolatorProcess>(this),
+                &CgroupsCpushareIsolatorProcess::_cleanup,
+                containerId,
+                lambda::_1))
+    .then([]() { return Nothing(); });
+}
+
+
+Future<list<Nothing>> CgroupsCpushareIsolatorProcess::_cleanup(
+    const ContainerID& containerId,
+    const Future<list<Nothing>>& future)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  CHECK_NOTNULL(infos[containerId]);
+
+  if (!future.isReady()) {
+    return Failure(
+        "Failed to clean up container " + stringify(containerId) +
+        " : " + (future.isFailed() ? future.failure() : "discarded"));
+  }
+
+  delete infos[containerId];
+  infos.erase(containerId);
+
+  return future;
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
new file mode 100644
index 0000000..4ce37bf
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cpushare.hpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CPUSHARE_ISOLATOR_HPP__
+#define __CPUSHARE_ISOLATOR_HPP__
+
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include <process/future.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+
+#include "slave/flags.hpp"
+
+#include "slave/containerizer/isolator.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Use the Linux cpu cgroup controller for cpu isolation which uses the
+// Completely Fair Scheduler (CFS).
+// - cpushare implements proportionally weighted scheduling.
+// - cfs implements hard quota based scheduling.
+class CgroupsCpushareIsolatorProcess : public MesosIsolatorProcess
+{
+public:
+  static Try<mesos::slave::Isolator*> create(const Flags& flags);
+
+  virtual ~CgroupsCpushareIsolatorProcess();
+
+  virtual process::Future<Nothing> recover(
+      const std::list<mesos::slave::ContainerState>& states,
+      const hashset<ContainerID>& orphans);
+
+  virtual process::Future<Option<mesos::slave::ContainerPrepareInfo>> prepare(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user);
+
+  virtual process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> cleanup(
+      const ContainerID& containerId);
+
+private:
+  CgroupsCpushareIsolatorProcess(
+      const Flags& flags,
+      const hashmap<std::string, std::string>& hierarchies,
+      const std::vector<std::string>& subsystems);
+
+  virtual process::Future<std::list<Nothing>> _cleanup(
+      const ContainerID& containerId,
+      const process::Future<std::list<Nothing>>& future);
+
+  struct Info
+  {
+    Info(const ContainerID& _containerId, const std::string& _cgroup)
+      : containerId(_containerId), cgroup(_cgroup) {}
+
+    const ContainerID containerId;
+    const std::string cgroup;
+    Option<pid_t> pid;
+    Option<Resources> resources;
+
+    process::Promise<mesos::slave::ContainerLimitation> limitation;
+  };
+
+  const Flags flags;
+
+  // Map from subsystem to hierarchy.
+  hashmap<std::string, std::string> hierarchies;
+
+  // Subsystems used for this isolator. Typically, there are two
+  // elements in the vector: 'cpu' and 'cpuacct'. If cpu and cpuacct
+  // systems are co-mounted (e.g., systems using systemd), then there
+  // will be only one element in the vector which is 'cpu,cpuacct'.
+  std::vector<std::string> subsystems;
+
+  // TODO(bmahler): Use Owned<Info>.
+  hashmap<ContainerID, Info*> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CPUSHARE_ISOLATOR_HPP__