You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2017/10/19 23:34:29 UTC

[08/11] mesos git commit: Added socket checking to the network ports isolator.

Added socket checking to the network ports isolator.

Implemented ports resource restrictions in the network ports isolator.
Periodically, scan for listening sockets and match them up to all
the open sockets in the containers we are tracking in the network.
Check any sockets we find against the ports resource and trigger a
resource limitation if the port has not been allocated.

Review: https://reviews.apache.org/r/60496/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5aec8417
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5aec8417
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5aec8417

Branch: refs/heads/master
Commit: 5aec841704be228272e0b0341fe4bb7cb5647b53
Parents: 91d9a61
Author: James Peach <jp...@apache.org>
Authored: Thu Oct 19 15:35:46 2017 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu Oct 19 16:33:35 2017 -0700

----------------------------------------------------------------------
 .../mesos/isolators/network/ports.cpp           | 224 ++++++++++++++++++-
 .../mesos/isolators/network/ports.hpp           |  18 +-
 2 files changed, 238 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5aec8417/src/slave/containerizer/mesos/isolators/network/ports.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/network/ports.cpp b/src/slave/containerizer/mesos/isolators/network/ports.cpp
index 3fbc27f..d3ec8ae 100644
--- a/src/slave/containerizer/mesos/isolators/network/ports.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/ports.cpp
@@ -20,21 +20,35 @@
 
 #include <sys/types.h>
 
+#include <process/after.hpp>
+#include <process/async.hpp>
+#include <process/defer.hpp>
 #include <process/id.hpp>
+#include <process/loop.hpp>
 
+#include <stout/lambda.hpp>
 #include <stout/numify.hpp>
 #include <stout/path.hpp>
+#include <stout/proc.hpp>
 
 #include "common/protobuf_utils.hpp"
 #include "common/values.hpp"
 
+#include "linux/cgroups.hpp"
+
+#include "slave/containerizer/mesos/linux_launcher.hpp"
+
 using std::list;
+using std::set;
 using std::string;
 using std::vector;
 
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::Owned;
+using process::defer;
 
 using mesos::slave::ContainerConfig;
 using mesos::slave::ContainerLaunchInfo;
@@ -42,6 +56,7 @@ using mesos::slave::ContainerLimitation;
 using mesos::slave::ContainerState;
 using mesos::slave::Isolator;
 
+using mesos::internal::values::intervalSetToRanges;
 using mesos::internal::values::rangesToIntervalSet;
 
 using namespace routing::diagnosis;
@@ -50,6 +65,94 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
+// Given a cgroup hierarchy and a set of container IDs, collect
+// the ports of all the listening sockets open in each cgroup,
+// indexed by container ID.
+static hashmap<ContainerID, IntervalSet<uint16_t>>
+collectContainerListeners(
+    const string& cgroupsRoot,
+    const string& freezerHierarchy,
+    const hashset<ContainerID>& containerIds)
+{
+  hashmap<ContainerID, IntervalSet<uint16_t>> listeners;
+
+  Try<hashmap<uint32_t, socket::Info>> listenInfos =
+    NetworkPortsIsolatorProcess::getListeningSockets();
+
+  if (listenInfos.isError()) {
+    LOG(ERROR) << "Failed to query listening sockets: "
+               << listenInfos.error();
+    return listeners;
+  }
+
+  if (listenInfos->empty()) {
+    return listeners;
+  }
+
+  foreach (const ContainerID& containerId, containerIds) {
+    // Reconstruct the cgroup path from the container ID.
+    string cgroup = LinuxLauncher::cgroup(cgroupsRoot, containerId);
+
+    VLOG(1) << "Checking processes for container " << containerId
+            << " in cgroup " << cgroup;
+
+    Try<set<pid_t>> pids = cgroups::processes(freezerHierarchy, cgroup);
+    if (pids.isError()) {
+      LOG(ERROR) << "Failed to list processes for container "
+                 << containerId << ": " << pids.error();
+      continue;
+    }
+
+    // For each process in this container, check whether any of its open
+    // sockets matches something in the listening set.
+    foreach (pid_t pid, pids.get()) {
+      Try<vector<uint32_t>> sockets =
+        NetworkPortsIsolatorProcess::getProcessSockets(pid);
+
+      // The PID might have exited since we sampled the cgroup tasks, so
+      // don't worry too much if this fails.
+      if (sockets.isError()) {
+        VLOG(1) << "Failed to list sockets for PID "
+                << stringify(pid) << ": " << sockets.error();
+        continue;
+      }
+
+      if (sockets->empty()) {
+        continue;
+      }
+
+      foreach (uint32_t inode, sockets.get()) {
+        if (!listenInfos->contains(inode)) {
+          continue;
+        }
+
+        const auto& socketInfo = listenInfos->at(inode);
+
+        process::network::inet::Address address(
+            socketInfo.sourceIP.get(),
+            ntohs(socketInfo.sourcePort.get()));
+
+        if (VLOG_IS_ON(1)) {
+          Result<string> cmd = proc::cmdline(pid);
+          if (cmd.isSome()) {
+            VLOG(1) << "PID " << pid << " in container " << containerId
+                    << " (" << cmd.get() << ")"
+                    << " is listening on port " << address.port;
+          } else {
+            VLOG(1) << "PID " << pid << " in container " << containerId
+                    << " is listening on port " << address.port;
+          }
+        }
+
+        listeners[containerId].add(address.port);
+      }
+    }
+  }
+
+  return listeners;
+}
+
+
 // Return a hashmap of routing::diagnosis::socket::Info structures
 // for all listening sockets, indexed by the socket inode.
 Try<hashmap<uint32_t, socket::Info>>
@@ -149,13 +252,34 @@ Try<vector<uint32_t>> NetworkPortsIsolatorProcess::getProcessSockets(pid_t pid)
 
 Try<Isolator*> NetworkPortsIsolatorProcess::create(const Flags& flags)
 {
+  if (flags.launcher != "linux") {
+    return Error("The 'network/ports' isolator requires the 'linux' launcher");
+  }
+
+  Try<string> freezerHierarchy = cgroups::prepare(
+      flags.cgroups_hierarchy,
+      "freezer",
+      flags.cgroups_root);
+
+  if (freezerHierarchy.isError()) {
+    return Error(
+        "Failed to prepare the freezer cgroup: " +
+        freezerHierarchy.error());
+  }
+
   return new MesosIsolator(process::Owned<MesosIsolatorProcess>(
-      new NetworkPortsIsolatorProcess()));
+      new NetworkPortsIsolatorProcess(
+          flags.cgroups_root,
+          freezerHierarchy.get())));
 }
 
 
-NetworkPortsIsolatorProcess::NetworkPortsIsolatorProcess()
-  : ProcessBase(process::ID::generate("network-ports-isolator"))
+NetworkPortsIsolatorProcess::NetworkPortsIsolatorProcess(
+    const string& _cgroupsRoot,
+    const string& _freezerHierarchy)
+  : ProcessBase(process::ID::generate("network-ports-isolator")),
+    cgroupsRoot(_cgroupsRoot),
+    freezerHierarchy(_freezerHierarchy)
 {
 }
 
@@ -189,6 +313,11 @@ Future<Option<ContainerLaunchInfo>> NetworkPortsIsolatorProcess::prepare(
     return Failure("Container has already been prepared");
   }
 
+  // TODO(jpeach) Figure out how to ignore tasks that are not going to use the
+  // host network. If they are in a network namespace (CNI network) then
+  // there's no point restricting them and we would have to implement any
+  // restructions by entering the right namespaces anyway.
+
   infos.emplace(containerId, Owned<Info>(new Info()));
 
   return update(containerId, containerConfig.resources())
@@ -220,10 +349,27 @@ Future<Nothing> NetworkPortsIsolatorProcess::update(
     return Nothing();
   }
 
+  const Owned<Info>& info = infos.at(containerId);
+
+  // The resources are attached to the root container. For child
+  // containers, just track its existence so that we can scan
+  // processes in the corresponding cgroup.
+  if (containerId.has_parent()) {
+    // Child containers don't get resources, only the parents do.
+    CHECK(resources.empty());
+
+    // Verify that we know about the root for this container.
+    CHECK(infos.contains(protobuf::getRootContainerId(containerId)));
+
+    return Nothing();
+  }
+
   Option<Value::Ranges> ports = resources.ports();
   if (ports.isSome()) {
     const Owned<Info>& info = infos.at(containerId);
     info->ports = rangesToIntervalSet<uint16_t>(ports.get()).get();
+  } else {
+    info->ports = IntervalSet<uint16_t>();
   }
 
   return Nothing();
@@ -243,6 +389,78 @@ Future<Nothing> NetworkPortsIsolatorProcess::cleanup(
   return Nothing();
 }
 
+
+// Given a map of containers to the ports they are listening on,
+// verify that each container is only listening on ports that we
+// have recorded as being allocated to it.
+Future<Nothing> NetworkPortsIsolatorProcess::check(
+    const hashmap<ContainerID, IntervalSet<uint16_t>>& listeners)
+{
+  foreachpair (const ContainerID& containerId,
+               const IntervalSet<uint16_t>& ports,
+               listeners) {
+    if (!infos.contains(containerId)) {
+      continue;
+    }
+
+    ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
+    CHECK(infos.contains(rootContainerId));
+
+    // Find the corresponding root container that holds the resources
+    // for this container.
+    const Owned<Info>& info = infos.at(rootContainerId);
+
+    if (info->ports.isSome() && !info->ports->contains(ports)) {
+      const IntervalSet<uint16_t> unallocatedPorts = ports - info->ports.get();
+
+      Resource resource;
+      resource.set_name("ports");
+      resource.set_type(Value::RANGES);
+      resource.mutable_ranges()->CopyFrom(
+          intervalSetToRanges(unallocatedPorts));
+
+      const string message =
+        "Container " + stringify(containerId) +
+        " is listening on unallocated port(s): " +
+        stringify(resource.ranges());
+
+      LOG(INFO) << message;
+
+      infos.at(containerId)->limitation.set(
+          protobuf::slave::createContainerLimitation(
+              Resources(resource),
+              message,
+              TaskStatus::REASON_CONTAINER_LIMITATION));
+    }
+  }
+
+  return Nothing();
+}
+
+
+void NetworkPortsIsolatorProcess::initialize()
+{
+  process::PID<NetworkPortsIsolatorProcess> self(this);
+
+  // Start a loop to periodically reconcile listening ports against allocated
+  // resources. Note that we have to do this after the process we want the
+  // loop to schedule against (the ports isolator process) has been spawned.
+  process::loop(
+      self,
+      []() {
+        return process::after(PORTS_WATCH_INTERVAL);
+      },
+      [=](const Nothing&) {
+        return process::async(
+            &collectContainerListeners,
+            cgroupsRoot,
+            freezerHierarchy,
+            infos.keys())
+          .then(defer(self, &NetworkPortsIsolatorProcess::check, lambda::_1))
+          .then([]() -> ControlFlow<Nothing> { return Continue(); });
+      });
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5aec8417/src/slave/containerizer/mesos/isolators/network/ports.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/network/ports.hpp b/src/slave/containerizer/mesos/isolators/network/ports.hpp
index af863ea..acad7bf 100644
--- a/src/slave/containerizer/mesos/isolators/network/ports.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/ports.hpp
@@ -24,6 +24,7 @@
 
 #include <process/owned.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/interval.hpp>
 
@@ -37,6 +38,9 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
+constexpr Duration PORTS_WATCH_INTERVAL = Minutes(1);
+
+
 // The `network/ports` isolator provides isolation of TCP listener
 // ports for tasks that share the host network namespace. It ensures
 // that tasks listen only on ports for which they hold `ports` resources.
@@ -72,8 +76,17 @@ public:
   virtual process::Future<Nothing> cleanup(
       const ContainerID& containerId);
 
+  // Public only for testing.
+  process::Future<Nothing> check(
+      const hashmap<ContainerID, IntervalSet<uint16_t>>& listeners);
+
+protected:
+  virtual void initialize();
+
 private:
-  NetworkPortsIsolatorProcess();
+  NetworkPortsIsolatorProcess(
+      const std::string& _cgroupsRoot,
+      const std::string& _freezerHierarchy);
 
   struct Info
   {
@@ -81,6 +94,9 @@ private:
     process::Promise<mesos::slave::ContainerLimitation> limitation;
   };
 
+  const std::string cgroupsRoot;
+  const std::string freezerHierarchy;
+
   hashmap<ContainerID, process::Owned<Info>> infos;
 };