You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/06/02 06:15:30 UTC

mesos git commit: Added flow classifiers for egress traffic in port mapping isolator using fq_codel qdisc.

Repository: mesos
Updated Branches:
  refs/heads/master 31c40e03c -> 60d690d16


Added flow classifiers for egress traffic in port mapping isolator
using fq_codel qdisc.

Currently we do nothing on the host egress side. By default, kernel
uses its own hash function to classify the packets to different TX
queues (if the hardware interface supports multiqueue). So packets
coming out of different containers could end up queueing in the same
TX queue, in this case we saw buffer bloat on some TX queue causing
packet drops.

We need to isolation the egress traffic so that containers will not
have interference with each other. The number of hardware TX queues is
limited by hardware interface, usually not enough to map our container
in 1:1 way, therefore we need some software solution. We choose
fq_codel and use tc filters to classify packets coming out of
different containers to different fq_codel flows, and the codel
algorithm on each flow could also help us to reduce the buffer bloat.

Note when the packets leave fq_codel, they still share the physical TX
queue(s), this is however (almost) beyond what we can control, we have
to rely on the kernel behavior.

Review: https://reviews.apache.org/r/31505


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60d690d1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60d690d1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60d690d1

Branch: refs/heads/master
Commit: 60d690d1693ce8422657afd04caead1b0260e54e
Parents: 31c40e0
Author: Cong Wang <cw...@twopensource.com>
Authored: Mon Jun 1 15:47:30 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Jun 1 21:15:33 2015 -0700

----------------------------------------------------------------------
 src/linux/routing/filter/basic.cpp              |   3 +
 src/linux/routing/filter/filter.hpp             |  14 +-
 src/linux/routing/filter/icmp.cpp               |   3 +
 src/linux/routing/filter/ip.cpp                 |   6 +-
 src/linux/routing/filter/ip.hpp                 |   5 +-
 .../isolators/network/port_mapping.cpp          | 387 +++++++++++++++++--
 .../isolators/network/port_mapping.hpp          |  18 +-
 src/slave/flags.cpp                             |   7 +
 src/slave/flags.hpp                             |   1 +
 9 files changed, 398 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/linux/routing/filter/basic.cpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/basic.cpp b/src/linux/routing/filter/basic.cpp
index 4ce8acb..937e8fc 100644
--- a/src/linux/routing/filter/basic.cpp
+++ b/src/linux/routing/filter/basic.cpp
@@ -125,6 +125,7 @@ Try<bool> create(
           Classifier(protocol),
           priority,
           None(),
+          None(),
           redirect));
 }
 
@@ -143,6 +144,7 @@ Try<bool> create(
           Classifier(protocol),
           priority,
           None(),
+          None(),
           mirror));
 }
 
@@ -169,6 +171,7 @@ Try<bool> update(
           Classifier(protocol),
           None(),
           None(),
+          None(),
           mirror));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/linux/routing/filter/filter.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/filter.hpp b/src/linux/routing/filter/filter.hpp
index aaca57f..7a737e0 100644
--- a/src/linux/routing/filter/filter.hpp
+++ b/src/linux/routing/filter/filter.hpp
@@ -52,16 +52,6 @@ public:
   Filter(const Handle& _parent,
          const Classifier& _classifier,
          const Option<Priority>& _priority,
-         const Option<Handle>& _handle)
-    : parent_(_parent),
-      classifier_(_classifier),
-      priority_(_priority),
-      handle_(_handle) {}
-
-  // Creates a filter with specified classid.
-  Filter(const Handle& _parent,
-         const Classifier& _classifier,
-         const Option<Priority>& _priority,
          const Option<Handle>& _handle,
          const Option<Handle>& _classid)
     : parent_(_parent),
@@ -76,11 +66,13 @@ public:
          const Classifier& _classifier,
          const Option<Priority>& _priority,
          const Option<Handle>& _handle,
+         const Option<Handle>& _classid,
          const Action& action)
     : parent_(_parent),
       classifier_(_classifier),
       priority_(_priority),
-      handle_(_handle)
+      handle_(_handle),
+      classid_(_classid)
   {
     attach(action);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/linux/routing/filter/icmp.cpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/icmp.cpp b/src/linux/routing/filter/icmp.cpp
index 76877fb..12797d7 100644
--- a/src/linux/routing/filter/icmp.cpp
+++ b/src/linux/routing/filter/icmp.cpp
@@ -225,6 +225,7 @@ Try<bool> create(
           classifier,
           priority,
           None(),
+          None(),
           redirect));
 }
 
@@ -243,6 +244,7 @@ Try<bool> create(
           classifier,
           priority,
           None(),
+          None(),
           mirror));
 }
 
@@ -287,6 +289,7 @@ Try<bool> update(
           classifier,
           None(),
           None(),
+          None(),
           mirror));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/linux/routing/filter/ip.cpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/ip.cpp b/src/linux/routing/filter/ip.cpp
index 0f3b856..ddce93f 100644
--- a/src/linux/routing/filter/ip.cpp
+++ b/src/linux/routing/filter/ip.cpp
@@ -477,6 +477,7 @@ Try<bool> create(
           classifier,
           priority,
           None(),
+          None(),
           redirect));
 }
 
@@ -496,6 +497,7 @@ Try<bool> create(
           classifier,
           priority,
           handle,
+          None(),
           redirect));
 }
 
@@ -514,6 +516,7 @@ Try<bool> create(
           classifier,
           priority,
           None(),
+          None(),
           terminal));
 }
 
@@ -531,7 +534,8 @@ Try<bool> create(
           classifier,
           priority,
           None(),
-          classid));
+          classid,
+          action::Terminal()));
 }
 
 Try<bool> remove(

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/linux/routing/filter/ip.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/ip.hpp b/src/linux/routing/filter/ip.hpp
index 9645f94..b8f5204 100644
--- a/src/linux/routing/filter/ip.hpp
+++ b/src/linux/routing/filter/ip.hpp
@@ -189,8 +189,9 @@ Try<bool> create(
 
 
 // Creates an IP packet filter attached to the given parent on the
-// link which will set the classid for packets. Returns false if an IP
-// packet filter attached to the given parent with the same classifier
+// link which will set the classid for packets and stop the IP packets
+// from being sent to the next filter. Returns false if an IP packet
+// filter attached to the given parent with the same classifier
 // already exists.
 Try<bool> create(
     const std::string& link,

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp
index 781c35b..871e9cf 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.cpp
@@ -69,6 +69,7 @@
 
 #include "linux/routing/link/link.hpp"
 
+#include "linux/routing/queueing/fq_codel.hpp"
 #include "linux/routing/queueing/ingress.hpp"
 
 #include "mesos/resources.hpp"
@@ -130,6 +131,7 @@ static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16;
 static const uint8_t ARP_FILTER_PRIORITY = 1;
 static const uint8_t ICMP_FILTER_PRIORITY = 2;
 static const uint8_t IP_FILTER_PRIORITY = 3;
+static const uint8_t DEFAULT_FILTER_PRIORITY = 4;
 
 
 // The secondary priorities used by filters.
@@ -138,6 +140,16 @@ static const uint8_t NORMAL = 2;
 static const uint8_t LOW = 3;
 
 
+// We assign a separate flow on host eth0 egress for each container
+// (See MESOS-2422 for details). Host egress traffic is assigned to a
+// reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers
+// are not heavy, so they can share the same flow.
+static const uint16_t HOST_FLOWID = 1;
+static const uint16_t ARP_FLOWID = 2;
+static const uint16_t ICMP_FLOWID = 2;
+static const uint16_t CONTAINER_MIN_FLOWID = 3;
+
+
 // The well known ports. Used for sanity check.
 static Interval<uint16_t> WELL_KNOWN_PORTS()
 {
@@ -748,6 +760,10 @@ PortMappingIsolatorProcess::Metrics::Metrics()
         "port_mapping/adding_eth0_ip_filters_errors"),
     adding_eth0_ip_filters_already_exist(
         "port_mapping/adding_eth0_ip_filters_already_exist"),
+    adding_eth0_egress_filters_errors(
+        "port_mapping/adding_eth0_egress_filters_errors"),
+    adding_eth0_egress_filters_already_exist(
+        "port_mapping/adding_eth0_egress_filters_already_exist"),
     adding_lo_ip_filters_errors(
         "port_mapping/adding_lo_ip_filters_errors"),
     adding_lo_ip_filters_already_exist(
@@ -776,6 +792,10 @@ PortMappingIsolatorProcess::Metrics::Metrics()
         "port_mapping/removing_eth0_ip_filters_errors"),
     removing_eth0_ip_filters_do_not_exist(
         "port_mapping/removing_eth0_ip_filters_do_not_exist"),
+    removing_eth0_egress_filters_errors(
+        "port_mapping/removing_eth0_egress_filters_errors"),
+    removing_eth0_egress_filters_do_not_exist(
+        "port_mapping/removinging_eth0_egress_filters_do_not_exist"),
     removing_lo_ip_filters_errors(
         "port_mapping/removing_lo_ip_filters_errors"),
     removing_lo_ip_filters_do_not_exist(
@@ -1196,11 +1216,34 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
 
   // Prepare the ingress queueing disciplines on host public interface
   // (eth0) and host loopback interface (lo).
-  Try<bool> createHostEth0Qdisc = ingress::create(eth0.get());
-  if (createHostEth0Qdisc.isError()) {
+  Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get());
+  if (createHostEth0IngressQdisc.isError()) {
     return Error(
         "Failed to create the ingress qdisc on " + eth0.get() +
-        ": " + createHostEth0Qdisc.error());
+        ": " + createHostEth0IngressQdisc.error());
+  }
+
+  set<uint16_t> freeFlowIds;
+  if (flags.egress_unique_flow_per_container) {
+    // Prepare a fq_codel queueing discipline on host public interface
+    // (eth0) for egress flow classification.
+    //
+    // TODO(cwang): Maybe we can continue when some other egress qdisc
+    // exists because this is not a necessary qdisc for network
+    // isolation, but we don't want inconsistency, so we just fail in
+    // this case. See details in MESOS-2370.
+    Try<bool> createHostEth0EgressQdisc = fq_codel::create(eth0.get());
+    if (createHostEth0EgressQdisc.isError()) {
+      return Error(
+          "Failed to create the egress qdisc on " + eth0.get() +
+          ": " + createHostEth0EgressQdisc.error());
+    }
+
+    // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that
+    // it's unlikely to run out of free flow IDs.
+    for(uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) {
+      freeFlowIds.insert(i);
+    }
   }
 
   Try<bool> createHostLoQdisc = ingress::create(lo.get());
@@ -1419,7 +1462,8 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags)
           hostNetworkConfigurations,
           egressRateLimitPerContainer,
           nonEphemeralPorts,
-          ephemeralPortsAllocator)));
+          ephemeralPortsAllocator,
+          freeFlowIds)));
 }
 
 
@@ -1628,8 +1672,6 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
       foreachvalue (Info* info, infos) {
         delete info;
       }
-      infos.clear();
-      unmanaged.clear();
 
       return Failure(
           "Failed to recover container " + stringify(containerId) +
@@ -1651,8 +1693,6 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
       foreachvalue (Info* info, infos) {
         delete info;
       }
-      infos.clear();
-      unmanaged.clear();
 
       return Failure(
           "Failed to recover orphaned container with pid " +
@@ -1680,8 +1720,6 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
       foreachvalue (Info* info, infos) {
         delete info;
       }
-      infos.clear();
-      unmanaged.clear();
 
       return Failure(
           "Failed to cleanup orphaned container with pid " +
@@ -1689,6 +1727,9 @@ Future<Nothing> PortMappingIsolatorProcess::recover(
     }
   }
 
+  // TODO(cwang): Consider removing unrecognized flow classifiers from
+  // host eth0 egress.
+
   LOG(INFO) << "Network isolator recovery complete";
 
   return Nothing();
@@ -1704,31 +1745,92 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
   // sure that we add filters to veth before adding filters to host
   // eth0 and host lo. Also, we need to make sure we remove filters
   // from host eth0 and host lo before removing filters from veth.
-  Result<vector<ip::Classifier>> classifiers =
+  Result<vector<ip::Classifier>> vethIngressClassifiers =
     ip::classifiers(veth(pid), ingress::HANDLE);
 
-  if (classifiers.isError()) {
+  if (vethIngressClassifiers.isError()) {
     return Error(
         "Failed to get all the IP filters on " + veth(pid) +
-        ": " + classifiers.error());
-  } else if (classifiers.isNone()) {
+        ": " + vethIngressClassifiers.error());
+  } else if (vethIngressClassifiers.isNone()) {
     return Error(
         "Failed to get all the IP filters on " + veth(pid) +
         ": link does not exist");
   }
 
+  hashmap<PortRange, uint16_t> flowIds;
+
+  if (flags.egress_unique_flow_per_container) {
+    // Get all egress IP flow classifiers on eth0.
+    Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters =
+      ip::filters(eth0, fq_codel::HANDLE);
+
+    if (eth0EgressFilters.isError()) {
+      return Error(
+          "Failed to get all the IP flow classifiers on " + eth0 +
+          ": " + eth0EgressFilters.error());
+    } else if (eth0EgressFilters.isNone()) {
+      return Error(
+          "Failed to get all the IP flow classifiers on " + eth0 +
+          ": link does not exist");
+    }
+
+    // Construct a port range to flow ID mapping from host eth0
+    // egress. This map will be used later.
+    foreach (const filter::Filter<ip::Classifier>& filter,
+             eth0EgressFilters.get()) {
+      const Option<PortRange> sourcePorts = filter.classifier().sourcePorts();
+      const Option<Handle> classid = filter.classid();
+
+      if (sourcePorts.isNone()) {
+        return Error("Missing source ports for filters on egress of " + eth0);
+      }
+
+      if (classid.isNone()) {
+        return Error("Missing classid for filters on egress of " + eth0);
+      }
+
+      if (flowIds.contains(sourcePorts.get())) {
+        return Error(
+          "Duplicated port range " + stringify(sourcePorts.get()) +
+          " detected on egress of " + eth0);
+      }
+
+      flowIds[sourcePorts.get()] = classid.get().secondary();
+    }
+  }
+
   IntervalSet<uint16_t> nonEphemeralPorts;
   IntervalSet<uint16_t> ephemeralPorts;
+  Option<uint16_t> flowId;
 
-  foreach (const ip::Classifier& classifier, classifiers.get()) {
-    Option<PortRange> sourcePorts = classifier.sourcePorts();
-    Option<PortRange> destinationPorts = classifier.destinationPorts();
+  foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) {
+    const Option<PortRange> sourcePorts = classifier.sourcePorts();
+    const Option<PortRange> destinationPorts = classifier.destinationPorts();
 
     // All the IP filters on veth used by us only have source ports.
     if (sourcePorts.isNone() || destinationPorts.isSome()) {
       return Error("Unexpected IP filter detected on " + veth(pid));
     }
 
+    if (flowIds.contains(sourcePorts.get())) {
+      if (flowId.isNone()) {
+        flowId = flowIds.get(sourcePorts.get());
+      } else if (flowId != flowIds.get(sourcePorts.get())) {
+        return Error(
+            "A container is associated with multiple flows "
+            "on egress of " + eth0);
+      }
+    } else if (flowId.isSome()) {
+      // This is the case where some port range of a container is
+      // assigned to a flow while some isn't. This could happen if
+      // slave crashes while those filters are created. However, this
+      // is OK for us because packets by default go to the host flow.
+      LOG(WARNING) << "Container port range " << sourcePorts.get()
+                   << " does not have flow id " << flowId.get()
+                   << " assigned";
+    }
+
     Interval<uint16_t> ports =
       (Bound<uint16_t>::closed(sourcePorts.get().begin()),
        Bound<uint16_t>::closed(sourcePorts.get().end()));
@@ -1777,6 +1879,11 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
             << " and ephemeral port range " << *ephemeralPorts.begin();
   }
 
+  if (flowId.isSome()) {
+    freeFlowIds.erase(flowId.get());
+    info->flowId = flowId.get();
+  }
+
   return CHECK_NOTNULL(info);
 }
 
@@ -1866,6 +1973,10 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
 
   info->pid = pid;
 
+  if (flags.egress_unique_flow_per_container) {
+    info->flowId = getNextFlowId();
+  }
+
   // Bind mount the network namespace handle of the process 'pid' to a
   // directory to hold an extra reference to the network namespace
   // which will be released in 'cleanup'. By holding the extra
@@ -1951,10 +2062,16 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
   // redirect IP traffic to/from containers.
   foreach (const PortRange& range,
            getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) {
-    LOG(INFO) << "Adding IP packet filters with ports " << range
-              << " for container " << containerId;
+    if (info->flowId.isSome()) {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " with flow ID " << info->flowId.get()
+                << " for container " << containerId;
+    } else {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " for container " << containerId;
+    }
 
-    Try<Nothing> add = addHostIPFilters(range, veth(pid));
+    Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
     if (add.isError()) {
       return Failure(
           "Failed to add IP packet filter with ports " +
@@ -2007,8 +2124,9 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
         " to host " + eth0 + " already exists");
   }
 
-  // Mirror ICMP and ARP packets from host eth0 to veths of all the
-  // containers.
+  // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP
+  // packets from host eth0 to veths of all the containers. We also
+  // setup flow classifiers for host eth0 egress.
   set<string> targets;
   foreachvalue (Info* info, infos) {
     if (info->pid.isSome()) {
@@ -2017,7 +2135,11 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
   }
 
   if (targets.size() == 1) {
-    // Create a new ICMP filter on host eth0.
+    // We just create the first container in which case we should
+    // create filters for ICMP and ARP packets.
+
+    // Create a new ICMP filter on host eth0 ingress for mirroring
+    // packets from host eth0 to veth.
     Try<bool> icmpEth0ToVeth = filter::icmp::create(
         eth0,
         ingress::HANDLE,
@@ -2038,7 +2160,8 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
           "The ICMP packet filter on host " + eth0 + " already exists");
     }
 
-    // Create a new ARP filter on host eth0.
+    // Create a new ARP filter on host eth0 ingress for mirroring
+    // packets from host eth0 to veth.
     Try<bool> arpEth0ToVeth = filter::basic::create(
         eth0,
         ingress::HANDLE,
@@ -2058,8 +2181,77 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
       return Failure(
           "The ARP packet filter on host " + eth0 + " already exists");
     }
+
+    if (flags.egress_unique_flow_per_container) {
+      // Create a new ICMP filter on host eth0 egress for classifying
+      // packets into a reserved flow.
+      Try<bool> icmpEth0Egress = filter::icmp::create(
+          eth0,
+          fq_codel::HANDLE,
+          icmp::Classifier(None()),
+          Priority(ICMP_FILTER_PRIORITY, NORMAL),
+          Handle(fq_codel::HANDLE, ICMP_FLOWID));
+
+      if (icmpEth0Egress.isError()) {
+        ++metrics.adding_eth0_egress_filters_errors;
+
+        return Failure(
+            "Failed to create the ICMP flow classifier on host " +
+            eth0 + ": " + icmpEth0Egress.error());
+      } else if (!icmpEth0Egress.get()) {
+        ++metrics.adding_eth0_egress_filters_already_exist;
+
+        return Failure(
+            "The ICMP flow classifier on host " + eth0 + " already exists");
+      }
+
+      // Create a new ARP filter on host eth0 egress for classifying
+      // packets into a reserved flow.
+      Try<bool> arpEth0Egress = filter::basic::create(
+          eth0,
+          fq_codel::HANDLE,
+          ETH_P_ARP,
+          Priority(ARP_FILTER_PRIORITY, NORMAL),
+          Handle(fq_codel::HANDLE, ARP_FLOWID));
+
+      if (arpEth0Egress.isError()) {
+        ++metrics.adding_eth0_egress_filters_errors;
+
+        return Failure(
+            "Failed to create the ARP flow classifier on host " +
+            eth0 + ": " + arpEth0Egress.error());
+      } else if (!arpEth0Egress.get()) {
+        ++metrics.adding_eth0_egress_filters_already_exist;
+
+        return Failure(
+            "The ARP flow classifier on host " + eth0 + " already exists");
+      }
+
+      // Rest of the host packets go to a reserved flow.
+      Try<bool> defaultEth0Egress = filter::basic::create(
+          eth0,
+          fq_codel::HANDLE,
+          ETH_P_ALL,
+          Priority(DEFAULT_FILTER_PRIORITY, NORMAL),
+          Handle(fq_codel::HANDLE, HOST_FLOWID));
+
+      if (defaultEth0Egress.isError()) {
+        ++metrics.adding_eth0_egress_filters_errors;
+
+        return Failure(
+            "Failed to create the default flow classifier on host " +
+            eth0 + ": " + defaultEth0Egress.error());
+      } else if (!defaultEth0Egress.get()) {
+        // NOTE: Since we don't remove this filter on purpose in
+        // _cleanup() (see the comments there), we just continue even
+        // if it already exists, so do nothing here.
+      }
+    }
   } else {
-    // Update the ICMP filter on host eth0.
+    // This is not the first container in which case we should update
+    // filters for ICMP and ARP packets.
+
+    // Update the ICMP filter on host eth0 ingress.
     Try<bool> icmpEth0ToVeth = filter::icmp::update(
         eth0,
         ingress::HANDLE,
@@ -2079,7 +2271,7 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
           "The ICMP packet filter on host " + eth0 + " already exists");
     }
 
-    // Update the ARP filter on host eth0.
+    // Update the ARP filter on host eth0 ingress.
     Try<bool> arpEth0ToVeth = filter::basic::update(
         eth0,
         ingress::HANDLE,
@@ -2257,10 +2449,18 @@ Future<Nothing> PortMappingIsolatorProcess::update(
   vector<PortRange> portsToAdd = getPortRanges(nonEphemeralPorts - remaining);
 
   foreach (const PortRange& range, portsToAdd) {
-    LOG(INFO) << "Adding IP packet filters with ports " << range
-              << " for container " << containerId;
+    if (info->flowId.isSome()) {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " with flow ID " << info->flowId.get()
+                << " for container " << containerId;
+    } else {
+      LOG(INFO) << "Adding IP packet filters with ports " << range
+                << " for container " << containerId;
+    }
 
-    Try<Nothing> add = addHostIPFilters(range, veth(pid));
+    // All IP packets from a container will be assigned a single flow
+    // on host eth0.
+    Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid));
     if (add.isError()) {
       return Failure(
           "Failed to add IP packet filter with ports " +
@@ -2614,6 +2814,13 @@ Try<Nothing> PortMappingIsolatorProcess::_cleanup(
   LOG(INFO) << "Freed ephemeral ports " << info->ephemeralPorts
             << " for container with pid " << pid;
 
+  if (info->flowId.isSome()) {
+    freeFlowIds.insert(info->flowId.get());
+
+    LOG(INFO) << "Freed flow ID " << info->flowId.get()
+              << " used by container with pid " << pid;
+  }
+
   set<string> targets;
   foreachvalue (Info* info, infos) {
     if (info->pid.isSome()) {
@@ -2623,7 +2830,7 @@ Try<Nothing> PortMappingIsolatorProcess::_cleanup(
 
   if (targets.empty()) {
     // This is the last container, remove the ARP and ICMP filters on
-    // host eth0.
+    // host eth0, remove the flow classifiers on eth0 egress too.
 
     // Remove the ICMP filter on host eth0.
     Try<bool> icmpEth0ToVeth = filter::icmp::remove(
@@ -2663,6 +2870,64 @@ Try<Nothing> PortMappingIsolatorProcess::_cleanup(
                  << " does not exist";
     }
 
+    if (flags.egress_unique_flow_per_container) {
+      // Remove the ICMP flow classifier on host eth0.
+      Try<bool> icmpEth0Egress = filter::icmp::remove(
+          eth0,
+          fq_codel::HANDLE,
+          icmp::Classifier(None()));
+
+      if (icmpEth0Egress.isError()) {
+        ++metrics.removing_eth0_egress_filters_errors;
+
+        errors.push_back(
+            "Failed to remove the ICMP flow classifier on host " + eth0 +
+            ": " + icmpEth0Egress.error());
+      } else if (!icmpEth0Egress.get()) {
+        ++metrics.removing_eth0_egress_filters_do_not_exist;
+
+        LOG(ERROR) << "The ICMP flow classifier on host " << eth0
+                   << " does not exist";
+      }
+
+      // Remove the ARP flow classifier on host eth0.
+      Try<bool> arpEth0Egress = filter::basic::remove(
+          eth0,
+          fq_codel::HANDLE,
+          ETH_P_ARP);
+
+      if (arpEth0Egress.isError()) {
+        ++metrics.removing_eth0_egress_filters_errors;
+
+        errors.push_back(
+            "Failed to remove the ARP flow classifier on host " + eth0 +
+            ": " + arpEth0Egress.error());
+      } else if (!arpEth0Egress.get()) {
+        ++metrics.removing_eth0_egress_filters_do_not_exist;
+
+        LOG(ERROR) << "The ARP flow classifier on host " << eth0
+                   << " does not exist";
+      }
+
+      // Kernel creates a place-holder filter, with handle 0, for each
+      // tuple (protocol, priority, kind). Our current implementation
+      // doesn't remove them, so all these filters are left. Packets
+      // will be dropped because these filters don't set a valid flow
+      // ID. We have to work around this situation for egress. The
+      // long term solution is removing all these filters after our
+      // own filters are all gone, see the upstream commit
+      // 1e052be69d045c8d0f82ff1116fd3e5a79661745 from:
+      // http://git.kernel.org/cgit/linux/kernel/git/davem/net-next.git.
+      //
+      // So, here we do NOT remove the default flow classifier on host
+      // eth0 on purpose so that after all containers are gone the
+      // host traffic still goes into this flow, this guarantees no
+      // traffic will be dropped by fq_codel qdisc.
+      //
+      // Maybe we need to remove the fq_codel qdisc on host eth0.
+      // TODO(cwang): Revise this in MESOS-2370, we don't remove
+      // ingress qdisc either.
+    }
   } else {
     // This is not the last container. Replace the ARP and ICMP
     // filters. The reason we do this is that we don't have an easy
@@ -2768,6 +3033,7 @@ Try<Nothing> PortMappingIsolatorProcess::_cleanup(
 // port range.
 Try<Nothing> PortMappingIsolatorProcess::addHostIPFilters(
     const PortRange& range,
+    const Option<uint16_t>& flowId,
     const string& veth)
 {
   // NOTE: The order in which these filters are added is important!
@@ -2906,6 +3172,32 @@ Try<Nothing> PortMappingIsolatorProcess::addHostIPFilters(
         veth + " already exists");
   }
 
+  if (flowId.isSome()) {
+    // Add IP packet filters to classify traffic sending to eth0
+    // in the same way so that traffic of each container will be
+    // classified to different flows defined by fq_codel.
+    Try<bool> hostEth0Egress = filter::ip::create(
+        eth0,
+        fq_codel::HANDLE,
+        ip::Classifier(None(), None(), range, None()),
+        Priority(IP_FILTER_PRIORITY, LOW),
+        Handle(fq_codel::HANDLE, flowId.get()));
+
+    if (hostEth0Egress.isError()) {
+      ++metrics.adding_eth0_egress_filters_errors;
+
+      return Error(
+          "Failed to create a flow classifier for " + veth +
+          " on host " + eth0 + ": " + hostEth0Egress.error());
+    } else if (!hostEth0Egress.get()) {
+      ++metrics.adding_eth0_egress_filters_already_exist;
+
+      return Error(
+          "The flow classifier for veth " + veth +
+          " on host " + eth0 + " already exists");
+    }
+  }
+
   return Nothing();
 }
 
@@ -2960,6 +3252,27 @@ Try<Nothing> PortMappingIsolatorProcess::removeHostIPFilters(
                << " to " << veth << " does not exist";
   }
 
+  if (flags.egress_unique_flow_per_container) {
+    // Remove the egress flow classifier on host eth0.
+    Try<bool> hostEth0Egress = filter::ip::remove(
+        eth0,
+        fq_codel::HANDLE,
+        ip::Classifier(None(), None(), range, None()));
+
+    if (hostEth0Egress.isError()) {
+      ++metrics.removing_eth0_egress_filters_errors;
+
+      return Error(
+          "Failed to remove the flow classifier from host " +
+          eth0 + " for " + veth + ": " + hostEth0Egress.error());
+    } else if (!hostEth0Egress.get()) {
+      ++metrics.removing_eth0_egress_filters_do_not_exist;
+
+      LOG(ERROR) << "The flow classifier from host " << eth0
+                 << " for " << range << " does not exist";
+    }
+  }
+
   // Now, we try to remove filters on veth. No need to proceed if the
   // user does not ask us to do so.
   if (!removeFiltersOnVeth) {
@@ -3177,6 +3490,20 @@ string PortMappingIsolatorProcess::scripts(Info* info)
   return script.str();
 }
 
+
+uint16_t PortMappingIsolatorProcess::getNextFlowId()
+{
+  // NOTE: It is very unlikely that we exhaust all the flow IDs.
+  CHECK(freeFlowIds.begin() != freeFlowIds.end());
+
+  uint16_t flowId = *freeFlowIds.begin();
+
+  freeFlowIds.erase(freeFlowIds.begin());
+
+  return flowId;
+}
+
+
 ////////////////////////////////////////////////////
 // Implementation for the ephemeral ports allocator.
 ////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp
index 9f83af1..4c719b1 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.hpp
@@ -23,6 +23,7 @@
 
 #include <sys/types.h>
 
+#include <set>
 #include <string>
 #include <vector>
 
@@ -206,6 +207,7 @@ private:
     const Interval<uint16_t> ephemeralPorts;
 
     Option<pid_t> pid;
+    Option<uint16_t> flowId;
   };
 
   // Define the metrics used by the port mapping network isolator.
@@ -216,6 +218,8 @@ private:
 
     process::metrics::Counter adding_eth0_ip_filters_errors;
     process::metrics::Counter adding_eth0_ip_filters_already_exist;
+    process::metrics::Counter adding_eth0_egress_filters_errors;
+    process::metrics::Counter adding_eth0_egress_filters_already_exist;
     process::metrics::Counter adding_lo_ip_filters_errors;
     process::metrics::Counter adding_lo_ip_filters_already_exist;
     process::metrics::Counter adding_veth_ip_filters_errors;
@@ -230,6 +234,8 @@ private:
     process::metrics::Counter adding_eth0_arp_filters_already_exist;
     process::metrics::Counter removing_eth0_ip_filters_errors;
     process::metrics::Counter removing_eth0_ip_filters_do_not_exist;
+    process::metrics::Counter removing_eth0_egress_filters_errors;
+    process::metrics::Counter removing_eth0_egress_filters_do_not_exist;
     process::metrics::Counter removing_lo_ip_filters_errors;
     process::metrics::Counter removing_lo_ip_filters_do_not_exist;
     process::metrics::Counter removing_veth_ip_filters_errors;
@@ -258,7 +264,8 @@ private:
       const hashmap<std::string, std::string>& _hostNetworkConfigurations,
       const Option<Bytes>& _egressRateLimitPerContainer,
       const IntervalSet<uint16_t>& _managedNonEphemeralPorts,
-      const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator)
+      const process::Owned<EphemeralPortsAllocator>& _ephemeralPortsAllocator,
+      const std::set<uint16_t>& _flowIDs)
     : flags(_flags),
       eth0(_eth0),
       lo(_lo),
@@ -269,7 +276,8 @@ private:
       hostNetworkConfigurations(_hostNetworkConfigurations),
       egressRateLimitPerContainer(_egressRateLimitPerContainer),
       managedNonEphemeralPorts(_managedNonEphemeralPorts),
-      ephemeralPortsAllocator(_ephemeralPortsAllocator) {}
+      ephemeralPortsAllocator(_ephemeralPortsAllocator),
+      freeFlowIds(_flowIDs) {}
 
   // Continuations.
   Try<Nothing> _cleanup(Info* info, const Option<ContainerID>& containerId);
@@ -290,6 +298,7 @@ private:
   // Helper functions.
   Try<Nothing> addHostIPFilters(
       const routing::filter::ip::PortRange& range,
+      const Option<uint16_t>& flowId,
       const std::string& veth);
 
   Try<Nothing> removeHostIPFilters(
@@ -300,6 +309,8 @@ private:
   // Return the scripts that will be executed in the child context.
   std::string scripts(Info* info);
 
+  uint16_t getNextFlowId();
+
   const Flags flags;
 
   const std::string eth0;
@@ -323,6 +334,9 @@ private:
 
   process::Owned<EphemeralPortsAllocator> ephemeralPortsAllocator;
 
+  // Store a set of unused flow ID's on this slave.
+  std::set<uint16_t> freeFlowIds;
+
   hashmap<ContainerID, Info*> infos;
 
   // Recovered containers from a previous run that weren't managed by

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index ab87098..17981b9 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -397,6 +397,13 @@ mesos::internal::slave::Flags::Flags()
       "This flag uses the Bytes type (defined in stout) and is used for\n"
       "the 'network/port_mapping' isolator.");
 
+  add(&Flags::egress_unique_flow_per_container,
+      "egress_unique_flow_per_container",
+      "Whether to assign an individual flow for each container for the\n"
+      "egress traffic. This flag is used for the 'network/port_mapping'\n"
+      "isolator.",
+      false);
+
   add(&Flags::network_enable_socket_statistics_summary,
       "network_enable_socket_statistics_summary",
       "Whether to collect socket statistics summary for each container.\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/60d690d1/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 84dbb8a..15dd838 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -95,6 +95,7 @@ public:
   Option<std::string> eth0_name;
   Option<std::string> lo_name;
   Option<Bytes> egress_rate_limit_per_container;
+  bool egress_unique_flow_per_container;
   bool network_enable_socket_statistics_summary;
   bool network_enable_socket_statistics_details;
 #endif