You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/04/23 02:36:32 UTC

hadoop git commit: YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0ebe84d30 -> a100be685


YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a100be68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a100be68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a100be68

Branch: refs/heads/trunk
Commit: a100be685cc4521e9949589948219231aa5d2733
Parents: 0ebe84d
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed Apr 22 17:26:13 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Apr 22 17:26:13 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   4 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  38 +-
 .../nodemanager/LinuxContainerExecutor.java     | 100 ++-
 .../OutboundBandwidthResourceHandler.java       |  29 +
 .../linux/resources/ResourceHandlerModule.java  | 128 ++++
 .../TrafficControlBandwidthHandlerImpl.java     | 281 ++++++++
 .../linux/resources/TrafficController.java      | 650 +++++++++++++++++++
 .../resources/TestResourceHandlerModule.java    |  78 +++
 .../TestTrafficControlBandwidthHandlerImpl.java | 231 +++++++
 .../linux/resources/TestTrafficController.java  | 327 ++++++++++
 10 files changed, 1864 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 975db66..21ef32d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -96,6 +96,10 @@ Release 2.8.0 - UNRELEASED
     YARN-3225. New parameter of CLI for decommissioning node gracefully in 
     RMAdmin CLI. (Devaraj K via junping_du)
 
+    YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing
+    network bandwidth traffic originating from YARN containers (Sidharta Seethana
+    via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 253ae08..a7f485d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -822,7 +822,43 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "resource.percentage-physical-cpu-limit";
   public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
       100;
-  
+
+
+  public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
+
+  /** This setting controls if resource handling for network bandwidth is enabled **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_ENABLED =
+      NM_NETWORK_RESOURCE_PREFIX + "enabled";
+  /** Network as a resource is disabled by default **/
+  @Private
+  public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
+
+  /** Specifies the interface to be used for applying network throttling rules **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_INTERFACE =
+      NM_NETWORK_RESOURCE_PREFIX + "interface";
+  @Private
+  public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
+
+  /** Specifies the total available outbound bandwidth on the node **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
+      NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
+  @Private
+  public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
+
+  /** Specifies the total outbound bandwidth available to YARN containers. defaults to
+   * NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
+   */
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
+      NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
+
   /** NM Webapp address.**/
   public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
   public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index d6e6894..f8da958 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -43,7 +43,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
 import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -60,6 +66,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   private boolean containerSchedPriorityIsSet = false;
   private int containerSchedPriorityAdjustment = 0;
   private boolean containerLimitUsers;
+  private ResourceHandler resourceHandlerChain;
 
   @Override
   public void setConf(Configuration conf) {
@@ -189,7 +196,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       throw new IOException("Linux container executor not configured properly"
           + " (error=" + exitCode + ")", e);
     }
-   
+
+    try {
+      Configuration conf = super.getConf();
+
+      resourceHandlerChain = ResourceHandlerModule
+          .getConfiguredResourceHandlerChain(conf);
+      if (resourceHandlerChain != null) {
+        resourceHandlerChain.bootstrap(conf);
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("Failed to bootstrap configured resource subsystems! ", e);
+      throw new IOException("Failed to bootstrap configured resource subsystems!");
+    }
+
     resourcesHandler.init(this);
   }
   
@@ -268,6 +288,51 @@ public class LinuxContainerExecutor extends ContainerExecutor {
             container.getResource());
     String resourcesOptions = resourcesHandler.getResourcesOption(
             containerId);
+    String tcCommandFile = null;
+
+    try {
+      if (resourceHandlerChain != null) {
+        List<PrivilegedOperation> ops = resourceHandlerChain
+            .preStart(container);
+
+        if (ops != null) {
+          List<PrivilegedOperation> resourceOps = new ArrayList<>();
+
+          resourceOps.add(new PrivilegedOperation
+              (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+                  resourcesOptions));
+
+          for (PrivilegedOperation op : ops) {
+            switch (op.getOperationType()) {
+              case ADD_PID_TO_CGROUP:
+                resourceOps.add(op);
+                break;
+              case TC_MODIFY_STATE:
+                tcCommandFile = op.getArguments().get(0);
+                break;
+              default:
+                LOG.warn("PrivilegedOperation type unsupported in launch: "
+                    + op.getOperationType());
+            }
+          }
+
+          if (resourceOps.size() > 1) {
+            //squash resource operations
+            try {
+              PrivilegedOperation operation = PrivilegedOperationExecutor
+                  .squashCGroupOperations(resourceOps);
+              resourcesOptions = operation.getArguments().get(0);
+            } catch (PrivilegedOperationException e) {
+              LOG.error("Failed to squash cgroup operations!", e);
+              throw new ResourceHandlerException("Failed to squash cgroup operations!");
+            }
+          }
+        }
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("ResourceHandlerChain.preStart() failed!", e);
+      throw new IOException("ResourceHandlerChain.preStart() failed!");
+    }
 
     ShellCommandExecutor shExec = null;
 
@@ -286,6 +351,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
             StringUtils.join(",", localDirs),
             StringUtils.join(",", logDirs),
             resourcesOptions));
+
+        if (tcCommandFile != null) {
+            command.add(tcCommandFile);
+        }
+
         String[] commandArray = command.toArray(new String[command.size()]);
         shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
             container.getLaunchContext().getEnvironment()); // sanitized env
@@ -334,6 +404,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       return exitCode;
     } finally {
       resourcesHandler.postExecute(containerId);
+
+      try {
+        if (resourceHandlerChain != null) {
+          resourceHandlerChain.postComplete(containerId);
+        }
+      } catch (ResourceHandlerException e) {
+        LOG.warn("ResourceHandlerChain.postComplete failed for " +
+            "containerId: " + containerId + ". Exception: " + e);
+      }
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
@@ -346,9 +425,28 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public int reacquireContainer(String user, ContainerId containerId)
       throws IOException, InterruptedException {
     try {
+      //Resource handler chain needs to reacquire container state
+      //as well
+      if (resourceHandlerChain != null) {
+        try {
+          resourceHandlerChain.reacquireContainer(containerId);
+        } catch (ResourceHandlerException e) {
+          LOG.warn("ResourceHandlerChain.reacquireContainer failed for " +
+              "containerId: " + containerId + " Exception: " + e);
+        }
+      }
+
       return super.reacquireContainer(user, containerId);
     } finally {
       resourcesHandler.postExecute(containerId);
+      if (resourceHandlerChain != null) {
+        try {
+          resourceHandlerChain.postComplete(containerId);
+        } catch (ResourceHandlerException e) {
+          LOG.warn("ResourceHandlerChain.postComplete failed for " +
+              "containerId: " + containerId + " Exception: " + e);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
new file mode 100644
index 0000000..c814c89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
@@ -0,0 +1,29 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface OutboundBandwidthResourceHandler extends ResourceHandler {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
new file mode 100644
index 0000000..30fc951
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -0,0 +1,128 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides mechanisms to get various resource handlers - cpu, memory, network,
+ * disk etc., - based on configuration
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ResourceHandlerModule {
+  private volatile static ResourceHandlerChain resourceHandlerChain;
+
+  /**
+   * This specific implementation might provide resource management as well
+   * as resource metrics functionality. We need to ensure that the same
+   * instance is used for both.
+   */
+  private volatile static TrafficControlBandwidthHandlerImpl
+      trafficControlBandwidthHandler;
+  private volatile static CGroupsHandler cGroupsHandler;
+
+  /**
+   * Returns an initialized, thread-safe CGroupsHandler instance
+   */
+  public static CGroupsHandler getCGroupsHandler(Configuration conf)
+      throws ResourceHandlerException {
+    if (cGroupsHandler == null) {
+      synchronized (CGroupsHandler.class) {
+        if (cGroupsHandler == null) {
+          cGroupsHandler = new CGroupsHandlerImpl(conf,
+              PrivilegedOperationExecutor.getInstance(conf));
+        }
+      }
+    }
+
+    return cGroupsHandler;
+  }
+
+  private static TrafficControlBandwidthHandlerImpl
+  getTrafficControlBandwidthHandler(Configuration conf)
+      throws ResourceHandlerException {
+    if (conf.getBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+        YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_ENABLED)) {
+      if (trafficControlBandwidthHandler == null) {
+        synchronized (OutboundBandwidthResourceHandler.class) {
+          if (trafficControlBandwidthHandler == null) {
+            trafficControlBandwidthHandler = new
+                TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+                .getInstance(conf), getCGroupsHandler(conf),
+                new TrafficController(conf, PrivilegedOperationExecutor
+                    .getInstance(conf)));
+          }
+        }
+      }
+
+      return trafficControlBandwidthHandler;
+    } else {
+      return null;
+    }
+  }
+
+  public static OutboundBandwidthResourceHandler
+  getOutboundBandwidthResourceHandler(Configuration conf)
+      throws ResourceHandlerException {
+    return getTrafficControlBandwidthHandler(conf);
+  }
+
+  private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
+      ResourceHandler handler) {
+    if (handler != null) {
+      handlerList.add(handler);
+    }
+  }
+
+  private static void initializeConfiguredResourceHandlerChain(
+      Configuration conf) throws ResourceHandlerException {
+    ArrayList<ResourceHandler> handlerList = new ArrayList<>();
+
+    addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
+    resourceHandlerChain = new ResourceHandlerChain(handlerList);
+  }
+
+  public static ResourceHandlerChain getConfiguredResourceHandlerChain
+      (Configuration conf) throws ResourceHandlerException {
+    if (resourceHandlerChain == null) {
+      synchronized (ResourceHandlerModule.class) {
+        if (resourceHandlerChain == null) {
+          initializeConfiguredResourceHandlerChain(conf);
+        }
+      }
+    }
+
+    if (resourceHandlerChain.getResourceHandlerList().size() != 0) {
+      return resourceHandlerChain;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
new file mode 100644
index 0000000..a0327a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
@@ -0,0 +1,281 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TrafficControlBandwidthHandlerImpl
+    implements OutboundBandwidthResourceHandler {
+
+  private static final Log LOG = LogFactory
+      .getLog(TrafficControlBandwidthHandlerImpl.class);
+  //In the absence of 'scheduling' support, we'll 'infer' the guaranteed
+  //outbound bandwidth for each container based on this number. This will
+  //likely go away once we add support on the RM for this resource type.
+  private static final int MAX_CONTAINER_COUNT = 50;
+
+  private final PrivilegedOperationExecutor privilegedOperationExecutor;
+  private final CGroupsHandler cGroupsHandler;
+  private final TrafficController trafficController;
+  private final ConcurrentHashMap<ContainerId, Integer> containerIdClassIdMap;
+
+  private Configuration conf;
+  private String device;
+  private boolean strictMode;
+  private int containerBandwidthMbit;
+  private int rootBandwidthMbit;
+  private int yarnBandwidthMbit;
+
+  public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+      privilegedOperationExecutor, CGroupsHandler cGroupsHandler,
+      TrafficController trafficController) {
+    this.privilegedOperationExecutor = privilegedOperationExecutor;
+    this.cGroupsHandler = cGroupsHandler;
+    this.trafficController = trafficController;
+    this.containerIdClassIdMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls
+   * controller and bootstraps a traffic control bandwidth shaping hierarchy
+   * @param configuration yarn configuration in use
+   * @return (potentially empty) list of privileged operations to execute.
+   * @throws ResourceHandlerException
+   */
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException {
+    conf = configuration;
+    //We'll do this inline for the time being - since this is a one time
+    //operation. At some point, LCE code can be refactored to batch mount
+    //operations across multiple controllers - cpu, net_cls, blkio etc
+    cGroupsHandler
+        .mountCGroupController(CGroupsHandler.CGroupController.NET_CLS);
+    device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE,
+        YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE);
+    strictMode = configuration.getBoolean(YarnConfiguration
+        .NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration
+        .DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
+    rootBandwidthMbit = conf.getInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration
+        .DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT);
+    yarnBandwidthMbit = conf.getInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit);
+    containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit /
+        MAX_CONTAINER_COUNT);
+
+    StringBuffer logLine = new StringBuffer("strict mode is set to :")
+        .append(strictMode).append(System.lineSeparator());
+
+    if (strictMode) {
+      logLine.append("container bandwidth will be capped to soft limit.")
+          .append(System.lineSeparator());
+    } else {
+      logLine.append(
+          "containers will be allowed to use spare YARN bandwidth.")
+          .append(System.lineSeparator());
+    }
+
+    logLine
+        .append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ")
+        .append(containerBandwidthMbit);
+
+    LOG.info(logLine);
+    trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit);
+
+    return null;
+  }
+
+  /**
+   * Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created
+   * and a net_cls classid is generated and written to a cgroup file. A
+   * traffic control shaping rule is created in order to limit outbound
+   * bandwidth utilization.
+   * @param container Container being launched
+   * @return privileged operations for some cgroups/tc operations.
+   * @throws ResourceHandlerException
+   */
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+    String containerIdStr = container.getContainerId().toString();
+    int classId = trafficController.getNextClassId();
+    String classIdStr = trafficController.getStringForNetClsClassId(classId);
+
+    cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
+            .NET_CLS,
+        containerIdStr);
+    cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
+        containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
+    containerIdClassIdMap.put(container.getContainerId(), classId);
+
+    //Now create a privileged operation in order to update the tasks file with
+    //the pid of the running container process (root of process tree). This can
+    //only be done at the time of launching the container, in a privileged
+    //executable.
+    String tasksFile = cGroupsHandler.getPathForCGroupTasks(
+        CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
+    String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
+        .append(tasksFile).toString();
+    List<PrivilegedOperation> ops = new ArrayList<>();
+
+    ops.add(new PrivilegedOperation(
+        PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
+
+    //Create a privileged operation to create a tc rule for this container
+    //We'll return this to the calling (Linux) Container Executor
+    //implementation for batching optimizations so that we don't fork/exec
+    //additional times during container launch.
+    TrafficController.BatchBuilder builder = trafficController.new
+        BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE);
+
+    builder.addContainerClass(classId, containerBandwidthMbit, strictMode);
+    ops.add(builder.commitBatchToTempFile());
+
+    return ops;
+  }
+
+  /**
+   * Reacquires state for a container - reads the classid from the cgroup
+   * being used for the container being reacquired
+   * @param containerId if of the container being reacquired.
+   * @return (potentially empty) list of privileged operations
+   * @throws ResourceHandlerException
+   */
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    String containerIdStr = containerId.toString();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Attempting to reacquire classId for container: " +
+          containerIdStr);
+    }
+
+    String classIdStrFromFile = cGroupsHandler.getCGroupParam(
+        CGroupsHandler.CGroupController.NET_CLS, containerIdStr,
+        CGroupsHandler.CGROUP_PARAM_CLASSID);
+    int classId = trafficController
+        .getClassIdFromFileContents(classIdStrFromFile);
+
+    LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr
+        + " -> " + classId);
+    containerIdClassIdMap.put(containerId, classId);
+
+    return null;
+  }
+
+  /**
+   * Returns total bytes sent per container to be used for metrics tracking
+   * purposes.
+   * @return a map of containerId to bytes sent
+   * @throws ResourceHandlerException
+   */
+  public Map<ContainerId, Integer> getBytesSentPerContainer()
+      throws ResourceHandlerException {
+    Map<Integer, Integer> classIdStats = trafficController.readStats();
+    Map<ContainerId, Integer> containerIdStats = new HashMap<>();
+
+    for (Map.Entry<ContainerId, Integer> entry : containerIdClassIdMap
+        .entrySet()) {
+      ContainerId containerId = entry.getKey();
+      Integer classId = entry.getValue();
+      Integer bytesSent = classIdStats.get(classId);
+
+      if (bytesSent == null) {
+        LOG.warn("No bytes sent metric found for container: " + containerId +
+            " with classId: " + classId);
+        continue;
+      }
+      containerIdStats.put(containerId, bytesSent);
+    }
+
+    return containerIdStats;
+  }
+
+  /**
+   * Cleanup operations once container is completed - deletes cgroup and
+   * removes traffic shaping rule(s).
+   * @param containerId of the container that was completed.
+   * @return
+   * @throws ResourceHandlerException
+   */
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    LOG.info("postComplete for container: " + containerId.toString());
+    cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
+        containerId.toString());
+
+    Integer classId = containerIdClassIdMap.get(containerId);
+
+    if (classId != null) {
+      PrivilegedOperation op = trafficController.new
+          BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .deleteContainerClass(classId).commitBatchToTempFile();
+
+      try {
+        privilegedOperationExecutor.executePrivilegedOperation(op, false);
+        trafficController.releaseClassId(classId);
+      } catch (PrivilegedOperationException e) {
+        LOG.warn("Failed to delete tc rule for classId: " + classId);
+        throw new ResourceHandlerException(
+            "Failed to delete tc rule for classId:" + classId);
+      }
+    } else {
+      LOG.warn("Not cleaning up tc rules. classId unknown for container: " +
+          containerId.toString());
+    }
+
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> teardown()
+      throws ResourceHandlerException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("teardown(): Nothing to do");
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
new file mode 100644
index 0000000..e33cea4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
@@ -0,0 +1,650 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Wrapper around the 'tc' tool. Provides access to a very specific subset of
+ * the functionality provided by the tc tool.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable class TrafficController {
+  private static final Log LOG = LogFactory.getLog(TrafficController.class);
+  private static final int ROOT_QDISC_HANDLE = 42;
+  private static final int ZERO_CLASS_ID = 0;
+  private static final int ROOT_CLASS_ID = 1;
+  /** Traffic shaping class used for all unclassified traffic */
+  private static final int DEFAULT_CLASS_ID = 2;
+  /** Traffic shaping class used for all YARN traffic */
+  private static final int YARN_ROOT_CLASS_ID = 3;
+  /** Classes 0-3 are used already. We need to ensure that container classes
+   * do not collide with these classids.
+   */
+  private static final int MIN_CONTAINER_CLASS_ID = 4;
+  /** This is the number of distinct (container) traffic shaping classes
+   * that are supported */
+  private static final int MAX_CONTAINER_CLASSES = 1024;
+
+  private static final String MBIT_SUFFIX = "mbit";
+  private static final String TMP_FILE_PREFIX = "tc.";
+  private static final String TMP_FILE_SUFFIX = ".cmds";
+
+  /** Root queuing discipline attached to the root of the interface */
+  private static final String FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT =
+      "qdisc add dev %s root handle %d: htb default %s";
+  /** Specifies a cgroup/classid based filter - based on the classid associated
+   * with the outbound packet, the corresponding traffic shaping rule is used
+   * . Please see tc documentation for additional details.
+   */
+  private static final String FORMAT_FILTER_CGROUP_ADD_TO_PARENT =
+      "filter add dev %s parent %d: protocol ip prio 10 handle 1: cgroup";
+  /** Standard format for adding a traffic shaping class to a parent, with
+   * the specified bandwidth limits
+   */
+  private static final String FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES =
+      "class add dev %s parent %d:%d classid %d:%d htb rate %s ceil %s";
+  /** Standard format to delete a traffic shaping class */
+  private static final String FORMAT_DELETE_CLASS =
+      "class del dev %s classid %d:%d";
+  /** Format of the classid that is to be used with the net_cls cgroup. Needs
+   * to be of the form 0xAAAABBBB */
+  private static final String FORMAT_NET_CLS_CLASS_ID = "0x%04d%04d";
+  /** Commands to read the qdsic(s)/filter(s)/class(es) associated with an
+   * interface
+   */
+  private static final String FORMAT_READ_STATE =
+      "qdisc show dev %1$s%n" +
+          "filter show dev %1$s%n" +
+          "class show dev %1$s";
+  private static final String FORMAT_READ_CLASSES = "class show dev %s";
+  /** Delete a qdisc and all its children - classes/filters etc */
+  private static final String FORMAT_WIPE_STATE =
+      "qdisc del dev %s parent root";
+
+  private final Configuration conf;
+  //Used to store the set of classids in use for container classes
+  private final BitSet classIdSet;
+  private final PrivilegedOperationExecutor privilegedOperationExecutor;
+
+  private String tmpDirPath;
+  private String device;
+  private int rootBandwidthMbit;
+  private int yarnBandwidthMbit;
+  private int defaultClassBandwidthMbit;
+
+  TrafficController(Configuration conf, PrivilegedOperationExecutor exec) {
+    this.conf = conf;
+    this.classIdSet = new BitSet(MAX_CONTAINER_CLASSES);
+    this.privilegedOperationExecutor = exec;
+  }
+
+  /**
+   * Bootstrap tc configuration
+   */
+  public void bootstrap(String device, int rootBandwidthMbit, int
+      yarnBandwidthMbit)
+      throws ResourceHandlerException {
+    if (device == null) {
+      throw new ResourceHandlerException("device cannot be null!");
+    }
+
+    String tmpDirBase = conf.get("hadoop.tmp.dir");
+    if (tmpDirBase == null) {
+      throw new ResourceHandlerException("hadoop.tmp.dir not set!");
+    }
+    tmpDirPath = tmpDirBase + "/nm-tc-rules";
+
+    File tmpDir = new File(tmpDirPath);
+    if (!(tmpDir.exists() || tmpDir.mkdirs())) {
+      LOG.warn("Unable to create directory: " + tmpDirPath);
+      throw new ResourceHandlerException("Unable to create directory: " +
+          tmpDirPath);
+    }
+
+    this.device = device;
+    this.rootBandwidthMbit = rootBandwidthMbit;
+    this.yarnBandwidthMbit = yarnBandwidthMbit;
+    defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0
+        ? rootBandwidthMbit : (rootBandwidthMbit - yarnBandwidthMbit);
+
+    boolean recoveryEnabled = conf.getBoolean(YarnConfiguration
+        .NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
+    String state = null;
+
+    if (!recoveryEnabled) {
+      LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding.");
+    } else {
+      //NM recovery enabled - run a state check
+      state = readState();
+      if (checkIfAlreadyBootstrapped(state)) {
+        LOG.info("TC configuration is already in place. Not wiping state.");
+
+        //We already have the list of existing container classes, if any
+        //that were created after bootstrapping
+        reacquireContainerClasses(state);
+        return;
+      } else {
+        LOG.info("TC configuration is incomplete. Wiping tc state before proceeding");
+      }
+    }
+
+    wipeState(); //start over in case preview bootstrap was incomplete
+    initializeState();
+  }
+
+  private void initializeState() throws ResourceHandlerException {
+    LOG.info("Initializing tc state.");
+
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_MODIFY_STATE)
+        .addRootQDisc()
+        .addCGroupFilter()
+        .addClassToRootQDisc(rootBandwidthMbit)
+        .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit)
+            //yarn bandwidth is capped with rate = ceil
+        .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit);
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      privilegedOperationExecutor.executePrivilegedOperation(op, false);
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to bootstrap outbound bandwidth configuration");
+
+      throw new ResourceHandlerException(
+          "Failed to bootstrap outbound bandwidth configuration", e);
+    }
+  }
+
+  /**
+   * Function to check if the interface in use has already been fully
+   * bootstrapped with the required tc configuration
+   *
+   * @return boolean indicating the result of the check
+   */
+  private boolean checkIfAlreadyBootstrapped(String state)
+      throws ResourceHandlerException {
+    List<String> regexes = new ArrayList<>();
+
+    //root qdisc
+    regexes.add(String.format("^qdisc htb %d: root(.)*$",
+        ROOT_QDISC_HANDLE));
+    //cgroup filter
+    regexes.add(String.format("^filter parent %d: protocol ip " +
+        "(.)*cgroup(.)*$", ROOT_QDISC_HANDLE));
+    //root, default and yarn classes
+    regexes.add(String.format("^class htb %d:%d root(.)*$",
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+    regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+        ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+    regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+        ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE,
+        ROOT_CLASS_ID));
+
+    for (String regex : regexes) {
+      Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
+
+      if (pattern.matcher(state).find()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Matched regex: " + regex);
+        }
+      } else {
+        String logLine = new StringBuffer("Failed to match regex: ")
+              .append(regex).append(" Current state: ").append(state).toString();
+        LOG.warn(logLine);
+        return false;
+      }
+    }
+
+    LOG.info("Bootstrap check succeeded");
+
+    return true;
+  }
+
+  private String readState() throws ResourceHandlerException {
+    //Sample state output:
+    //    qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0
+    //    filter parent 42: protocol ip pref 10 cgroup handle 0x1
+    //
+    //    filter parent 42: protocol ip pref 10 cgroup handle 0x1
+    //
+    //    class htb 42:1 root rate 10000Kbit ceil 10000Kbit burst 1600b cburst 1600b
+    //    class htb 42:2 parent 42:1 prio 0 rate 3000Kbit ceil 10000Kbit burst 1599b cburst 1600b
+    //    class htb 42:3 parent 42:1 prio 0 rate 7000Kbit ceil 7000Kbit burst 1598b cburst 1598b
+
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_READ_STATE)
+        .readState();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      String output =
+          privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("TC state: %n" + output);
+      }
+
+      return output;
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to bootstrap outbound bandwidth rules");
+      throw new ResourceHandlerException(
+          "Failed to bootstrap outbound bandwidth rules", e);
+    }
+  }
+
+  private void wipeState() throws ResourceHandlerException {
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_MODIFY_STATE)
+        .wipeState();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      LOG.info("Wiping tc state.");
+      privilegedOperationExecutor.executePrivilegedOperation(op, false);
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to wipe tc state. This could happen if the interface" +
+          " is already in its default state. Ignoring.");
+      //Ignoring this exception. This could happen if the interface is already
+      //in its default state. For this reason we don't throw a
+      //ResourceHandlerException here.
+    }
+  }
+
+  /**
+   * Parses the current state looks for classids already in use
+   */
+  private void reacquireContainerClasses(String state) {
+    //At this point we already have already successfully passed
+    //checkIfAlreadyBootstrapped() - so we know that at least the
+    //root classes are in place.
+    String tcClassesStr = state.substring(state.indexOf("class"));
+    //one class per line - the results of the split will need to trimmed
+    String[] tcClasses = Pattern.compile("$", Pattern.MULTILINE)
+        .split(tcClassesStr);
+    Pattern tcClassPattern = Pattern.compile(String.format(
+        "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+
+    synchronized (classIdSet) {
+      for (String tcClassSplit : tcClasses) {
+        String tcClass = tcClassSplit.trim();
+
+        if (!tcClass.isEmpty()) {
+          Matcher classMatcher = tcClassPattern.matcher(tcClass);
+          if (classMatcher.matches()) {
+            int classId = Integer.parseInt(classMatcher.group(1));
+            if (classId >= MIN_CONTAINER_CLASS_ID) {
+              classIdSet.set(classId - MIN_CONTAINER_CLASS_ID);
+              LOG.info("Reacquired container classid: " + classId);
+            }
+          } else {
+            LOG.warn("Unable to match classid in string:" + tcClass);
+          }
+        }
+      }
+    }
+  }
+
+  public Map<Integer, Integer> readStats() throws ResourceHandlerException {
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_READ_STATS)
+        .readClasses();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      String output =
+          privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("TC stats output:" + output);
+      }
+
+      Map<Integer, Integer> classIdBytesStats = parseStatsString(output);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("classId -> bytes sent %n" + classIdBytesStats);
+      }
+
+      return classIdBytesStats;
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to get tc stats");
+      throw new ResourceHandlerException("Failed to get tc stats", e);
+    }
+  }
+
+  private Map<Integer, Integer> parseStatsString(String stats) {
+    //Example class stats segment (multiple present in tc output)
+    //  class htb 42:4 parent 42:3 prio 0 rate 1000Kbit ceil 7000Kbit burst1600b cburst 1598b
+    //   Sent 77921300 bytes 52617 pkt (dropped 0, overlimits 0 requeues 0)
+    //   rate 6973Kbit 589pps backlog 0b 39p requeues 0
+    //   lended: 3753 borrowed: 22514 giants: 0
+    //   tokens: -122164 ctokens: -52488
+
+    String[] lines = Pattern.compile("$", Pattern.MULTILINE)
+        .split(stats);
+    Pattern tcClassPattern = Pattern.compile(String.format(
+        "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+    Pattern bytesPattern = Pattern.compile("Sent (\\d+) bytes.*");
+
+    int currentClassId = -1;
+    Map<Integer, Integer> containerClassIdStats = new HashMap<>();
+
+    for (String lineSplit : lines) {
+      String line = lineSplit.trim();
+
+      if (!line.isEmpty()) {
+        //Check if we encountered a stats segment for a container class
+        Matcher classMatcher = tcClassPattern.matcher(line);
+        if (classMatcher.matches()) {
+          int classId = Integer.parseInt(classMatcher.group(1));
+          if (classId >= MIN_CONTAINER_CLASS_ID) {
+            currentClassId = classId;
+            continue;
+          }
+        }
+
+        //Check if we encountered a stats line
+        Matcher bytesMatcher = bytesPattern.matcher(line);
+        if (bytesMatcher.matches()) {
+          //we found at least one class segment
+          if (currentClassId != -1) {
+            int bytes = Integer.parseInt(bytesMatcher.group(1));
+            containerClassIdStats.put(currentClassId, bytes);
+          } else {
+            LOG.warn("Matched a 'bytes sent' line outside of a class stats " +
+                  "segment : " + line);
+          }
+          continue;
+        }
+
+        //skip other kinds of non-empty lines - since we aren't interested in
+        //them.
+      }
+    }
+
+    return containerClassIdStats;
+  }
+
+  /**
+   * Returns a formatted string for attaching a qdisc to the root of the
+   * device/interface. Additional qdisc
+   * parameters can be supplied - for example, the default 'class' to use for
+   * incoming packets
+   */
+  private String getStringForAddRootQDisc() {
+    return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device,
+        ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID);
+  }
+
+  /**
+   * Returns a formatted string for a filter that matches packets based on the
+   * presence of net_cls classids
+   */
+  private String getStringForaAddCGroupFilter() {
+    return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device,
+        ROOT_QDISC_HANDLE);
+  }
+
+  /**
+   * Get the next available classid. This has to be released post container
+   * complete
+   */
+  public int getNextClassId() throws ResourceHandlerException {
+    synchronized (classIdSet) {
+      int index = classIdSet.nextClearBit(0);
+      if (index >= MAX_CONTAINER_CLASSES) {
+        throw new ResourceHandlerException("Reached max container classes: "
+            + MAX_CONTAINER_CLASSES);
+      }
+      classIdSet.set(index);
+      return (index + MIN_CONTAINER_CLASS_ID);
+    }
+  }
+
+  public void releaseClassId(int classId) throws ResourceHandlerException {
+    synchronized (classIdSet) {
+      int index = classId - MIN_CONTAINER_CLASS_ID;
+      if (index < 0 || index >= MAX_CONTAINER_CLASSES) {
+        throw new ResourceHandlerException("Invalid incoming classId: "
+            + classId);
+      }
+      classIdSet.clear(index);
+    }
+  }
+
+  /**
+   * Returns a formatted string representing the given classId including a
+   * handle
+   */
+  public String getStringForNetClsClassId(int classId) {
+    return String.format(FORMAT_NET_CLS_CLASS_ID, ROOT_QDISC_HANDLE, classId);
+  }
+
+  /**
+   * A value read out of net_cls.classid file is in decimal form. We need to
+   * convert to 32-bit/8 digit hex, extract the lower 16-bit/four digits
+   * as an int
+   */
+  public int getClassIdFromFileContents(String input) {
+    //convert from decimal back to fixed size hex form
+    //e.g 4325381 -> 00420005
+    String classIdStr = String.format("%08x", Integer.parseInt(input));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ClassId hex string : " + classIdStr);
+    }
+
+    //extract and return 4 digits
+    //e.g 00420005 -> 0005
+    return Integer.parseInt(classIdStr.substring(4));
+  }
+
+  /**
+   * Adds a tc class to qdisc at root
+   */
+  private String getStringForAddClassToRootQDisc(int rateMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ZERO_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID,
+        rateMbitStr, rateMbitStr);
+  }
+
+  private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForAddContainerClass(int classId, int rateMbit, int
+      ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit
+    // ceil 700mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, classId,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForDeleteContainerClass(int classId) {
+    //example "class del dev eth0 classid 42:7"
+    return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE,
+        classId);
+  }
+
+  private String getStringForReadState() {
+    return String.format(FORMAT_READ_STATE, device);
+  }
+
+  private String getStringForReadClasses() {
+    return String.format(FORMAT_READ_CLASSES, device);
+  }
+
+  private String getStringForWipeState() {
+    return String.format(FORMAT_WIPE_STATE, device);
+  }
+
+  public class BatchBuilder {
+    final PrivilegedOperation operation;
+    final List<String> commands;
+
+    public BatchBuilder(PrivilegedOperation.OperationType opType)
+        throws ResourceHandlerException {
+      switch (opType) {
+      case TC_MODIFY_STATE:
+      case TC_READ_STATE:
+      case TC_READ_STATS:
+        operation = new PrivilegedOperation(opType, (String) null);
+        commands = new ArrayList<>();
+        break;
+      default:
+        throw new ResourceHandlerException("Not a tc operation type : " +
+            opType);
+      }
+    }
+
+    private BatchBuilder addRootQDisc() {
+      commands.add(getStringForAddRootQDisc());
+      return this;
+    }
+
+    private BatchBuilder addCGroupFilter() {
+      commands.add(getStringForaAddCGroupFilter());
+      return this;
+    }
+
+    private BatchBuilder addClassToRootQDisc(int rateMbit) {
+      commands.add(getStringForAddClassToRootQDisc(rateMbit));
+      return this;
+    }
+
+    private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) {
+      commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit));
+      return this;
+    }
+
+    private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) {
+      commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit));
+      return this;
+    }
+
+    public BatchBuilder addContainerClass(int classId, int rateMbit, boolean
+        strictMode) {
+      int ceilMbit;
+
+      if (strictMode) {
+        ceilMbit = rateMbit;
+      } else {
+        ceilMbit = yarnBandwidthMbit;
+      }
+
+      commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit));
+      return this;
+    }
+
+    public BatchBuilder deleteContainerClass(int classId) {
+      commands.add(getStringForDeleteContainerClass(classId));
+      return this;
+    }
+
+    private BatchBuilder readState() {
+      commands.add(getStringForReadState());
+      return this;
+    }
+
+    //We'll read all classes, but use a different tc operation type
+    //when reading stats for all these classes. Stats are fetched using a
+    //different tc cli option (-s).
+
+    private BatchBuilder readClasses() {
+      //We'll read all classes, but use a different tc operation type
+      //for reading stats for all these classes. Stats are fetched using a
+      //different tc cli option (-s).
+      commands.add(getStringForReadClasses());
+      return this;
+    }
+
+    private BatchBuilder wipeState() {
+      commands.add(getStringForWipeState());
+      return this;
+    }
+
+    public PrivilegedOperation commitBatchToTempFile()
+        throws ResourceHandlerException {
+      try {
+        File tcCmds = File.createTempFile(TMP_FILE_PREFIX, TMP_FILE_SUFFIX, new
+            File(tmpDirPath));
+        Writer writer = new OutputStreamWriter(new FileOutputStream(tcCmds),
+            "UTF-8");
+        PrintWriter printWriter = new PrintWriter(writer);
+
+        for (String command : commands) {
+          printWriter.println(command);
+        }
+
+        printWriter.close();
+        operation.appendArgs(tcCmds.getAbsolutePath());
+
+        return operation;
+      } catch (IOException e) {
+        LOG.warn("Failed to create or write to temporary file in dir: " +
+            tmpDirPath);
+        throw new ResourceHandlerException(
+            "Failed to create or write to temporary file in dir: "
+                + tmpDirPath);
+      }
+    }
+  } //end BatchBuilder
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
new file mode 100644
index 0000000..939dfe7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
@@ -0,0 +1,78 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestResourceHandlerModule {
+  private static final Log LOG = LogFactory.
+      getLog(TestResourceHandlerModule.class);
+  Configuration emptyConf;
+  Configuration networkEnabledConf;
+
+  @Before
+  public void setup() {
+    emptyConf = new YarnConfiguration();
+    networkEnabledConf = new YarnConfiguration();
+
+    networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+        true);
+    //We need to bypass mtab parsing for figuring out cgroups mount locations
+    networkEnabledConf.setBoolean(YarnConfiguration
+        .NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
+  }
+
+  @Test
+  public void testOutboundBandwidthHandler() {
+    try {
+      //This resourceHandler should be non-null only if network as a resource
+      //is explicitly enabled
+      OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule
+          .getOutboundBandwidthResourceHandler(emptyConf);
+      Assert.assertNull(resourceHandler);
+
+      //When network as a resource is enabled this should be non-null
+      resourceHandler = ResourceHandlerModule
+          .getOutboundBandwidthResourceHandler(networkEnabledConf);
+      Assert.assertNotNull(resourceHandler);
+
+      //Ensure that outbound bandwidth resource handler is present in the chain
+      ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule
+          .getConfiguredResourceHandlerChain(networkEnabledConf);
+      List<ResourceHandler> resourceHandlers = resourceHandlerChain
+          .getResourceHandlerList();
+      //Exactly one resource handler in chain
+      Assert.assertEquals(resourceHandlers.size(), 1);
+      //Same instance is expected to be in the chain.
+      Assert.assertTrue(resourceHandlers.get(0) == resourceHandler);
+    } catch (ResourceHandlerException e) {
+      Assert.fail("Unexpected ResourceHandlerException: " + e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
new file mode 100644
index 0000000..50ad6b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
@@ -0,0 +1,231 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.util.List;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficControlBandwidthHandlerImpl {
+  private static final Log LOG =
+      LogFactory.getLog(TestTrafficControlBandwidthHandlerImpl.class);
+  private static final int ROOT_BANDWIDTH_MBIT = 100;
+  private static final int YARN_BANDWIDTH_MBIT = 70;
+  private static final int TEST_CLASSID = 100;
+  private static final String TEST_CLASSID_STR = "42:100";
+  private static final String TEST_CONTAINER_ID_STR = "container_01";
+  private static final String TEST_TASKS_FILE = "testTasksFile";
+
+  private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+  private CGroupsHandler cGroupsHandlerMock;
+  private TrafficController trafficControllerMock;
+  private Configuration conf;
+  private String tmpPath;
+  private String device;
+  ContainerId containerIdMock;
+  Container containerMock;
+
+  @Before
+  public void setup() {
+    privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+    cGroupsHandlerMock = mock(CGroupsHandler.class);
+    trafficControllerMock = mock(TrafficController.class);
+    conf = new YarnConfiguration();
+    tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+        ('/').append("hadoop.tmp.dir").toString();
+    device = YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE;
+    containerIdMock = mock(ContainerId.class);
+    containerMock = mock(Container.class);
+    when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR);
+    //mock returning a mock - an angel died somewhere.
+    when(containerMock.getContainerId()).thenReturn(containerIdMock);
+
+    conf.setInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, ROOT_BANDWIDTH_MBIT);
+    conf.setInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, YARN_BANDWIDTH_MBIT);
+    conf.set("hadoop.tmp.dir", tmpPath);
+    //In these tests, we'll only use TrafficController with recovery disabled
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+  }
+
+  @Test
+  public void testBootstrap() {
+    TrafficControlBandwidthHandlerImpl handlerImpl = new
+        TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+        cGroupsHandlerMock, trafficControllerMock);
+
+    try {
+      handlerImpl.bootstrap(conf);
+      verify(cGroupsHandlerMock).mountCGroupController(
+          eq(CGroupsHandler.CGroupController.NET_CLS));
+      verifyNoMoreInteractions(cGroupsHandlerMock);
+      verify(trafficControllerMock).bootstrap(eq(device),
+          eq(ROOT_BANDWIDTH_MBIT),
+          eq(YARN_BANDWIDTH_MBIT));
+      verifyNoMoreInteractions(trafficControllerMock);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected ResourceHandlerException!");
+    }
+  }
+
+  @Test
+  public void testLifeCycle() {
+    TrafficController trafficControllerSpy = spy(new TrafficController(conf,
+        privilegedOperationExecutorMock));
+    TrafficControlBandwidthHandlerImpl handlerImpl = new
+        TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+        cGroupsHandlerMock, trafficControllerSpy);
+
+    try {
+      handlerImpl.bootstrap(conf);
+      testPreStart(trafficControllerSpy, handlerImpl);
+      testPostComplete(trafficControllerSpy, handlerImpl);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected ResourceHandlerException!");
+    }
+  }
+
+  private void testPreStart(TrafficController trafficControllerSpy,
+      TrafficControlBandwidthHandlerImpl handlerImpl) throws
+      ResourceHandlerException {
+    //This is not the cleanest of solutions - but since we are testing the
+    //preStart/postComplete lifecycle, we don't have a different way of
+    //handling this - we don't keep track of the number of invocations by
+    //a class we are not testing here (TrafficController)
+    //So, we'll reset this mock. This is not a problem with other mocks.
+    reset(privilegedOperationExecutorMock);
+
+    doReturn(TEST_CLASSID).when(trafficControllerSpy).getNextClassId();
+    doReturn(TEST_CLASSID_STR).when(trafficControllerSpy)
+        .getStringForNetClsClassId(TEST_CLASSID);
+    when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler
+        .CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn(
+        TEST_TASKS_FILE);
+
+    List<PrivilegedOperation> ops = handlerImpl.preStart(containerMock);
+
+    //Ensure that cgroups is created and updated as expected
+    verify(cGroupsHandlerMock).createCGroup(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+    verify(cGroupsHandlerMock).updateCGroupParam(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR),
+        eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID_STR));
+
+    //Now check the privileged operations being returned
+    //We expect two operations - one for adding pid to tasks file and another
+    //for a tc modify operation
+    Assert.assertEquals(2, ops.size());
+
+    //Verify that the add pid op is correct
+    PrivilegedOperation addPidOp = ops.get(0);
+    String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX +
+        TEST_TASKS_FILE;
+    List<String> addPidOpArgs = addPidOp.getArguments();
+
+    Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        addPidOp.getOperationType());
+    Assert.assertEquals(1, addPidOpArgs.size());
+    Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0));
+
+    //Verify that that tc modify op is correct
+    PrivilegedOperation tcModifyOp = ops.get(1);
+    List<String> tcModifyOpArgs = tcModifyOp.getArguments();
+
+    Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+        tcModifyOp.getOperationType());
+    Assert.assertEquals(1, tcModifyOpArgs.size());
+    //verify that the tc command file exists
+    Assert.assertTrue(new File(tcModifyOpArgs.get(0)).exists());
+  }
+
+  private void testPostComplete(TrafficController trafficControllerSpy,
+      TrafficControlBandwidthHandlerImpl handlerImpl) throws
+      ResourceHandlerException {
+    //This is not the cleanest of solutions - but since we are testing the
+    //preStart/postComplete lifecycle, we don't have a different way of
+    //handling this - we don't keep track of the number of invocations by
+    //a class we are not testing here (TrafficController)
+    //So, we'll reset this mock. This is not a problem with other mocks.
+    reset(privilegedOperationExecutorMock);
+
+    List<PrivilegedOperation> ops = handlerImpl.postComplete(containerIdMock);
+
+    verify(cGroupsHandlerMock).deleteCGroup(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+
+    try {
+      //capture privileged op argument and ensure it is correct
+      ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      verify(privilegedOperationExecutorMock)
+          .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+      List<String> args = opCaptor.getValue().getArguments();
+
+      Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          opCaptor.getValue().getOperationType());
+      Assert.assertEquals(1, args.size());
+      //ensure that tc command file exists
+      Assert.assertTrue(new File(args.get(0)).exists());
+
+      verify(trafficControllerSpy).releaseClassId(TEST_CLASSID);
+    } catch (PrivilegedOperationException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.fail("Unexpected PrivilegedOperationException from mock!");
+    }
+
+    //We don't expect any operations to be returned here
+    Assert.assertNull(ops);
+  }
+
+  @After
+  public void teardown() {
+    FileUtil.fullyDelete(new File(tmpPath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
new file mode 100644
index 0000000..7ea7135
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
@@ -0,0 +1,327 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficController {
+  private static final Log LOG = LogFactory.getLog(TestTrafficController.class);
+  private static final int ROOT_BANDWIDTH_MBIT = 100;
+  private static final int YARN_BANDWIDTH_MBIT = 70;
+  private static final int CONTAINER_BANDWIDTH_MBIT = 10;
+
+  //These constants are closely tied to the implementation of TrafficController
+  //and will have to be modified in tandem with any related TrafficController
+  //changes.
+  private static final String DEVICE = "eth0";
+  private static final String WIPE_STATE_CMD = "qdisc del dev eth0 parent root";
+  private static final String ADD_ROOT_QDISC_CMD =
+      "qdisc add dev eth0 root handle 42: htb default 2";
+  private static final String ADD_CGROUP_FILTER_CMD =
+      "filter add dev eth0 parent 42: protocol ip prio 10 handle 1: cgroup";
+  private static final String ADD_ROOT_CLASS_CMD =
+      "class add dev eth0 parent 42:0 classid 42:1 htb rate 100mbit ceil 100mbit";
+  private static final String ADD_DEFAULT_CLASS_CMD =
+      "class add dev eth0 parent 42:1 classid 42:2 htb rate 30mbit ceil 100mbit";
+  private static final String ADD_YARN_CLASS_CMD =
+      "class add dev eth0 parent 42:1 classid 42:3 htb rate 70mbit ceil 70mbit";
+  private static final String DEFAULT_TC_STATE_EXAMPLE =
+      "qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1";
+  private static final String READ_QDISC_CMD = "qdisc show dev eth0";
+  private static final String READ_FILTER_CMD = "filter show dev eth0";
+  private static final String READ_CLASS_CMD = "class show dev eth0";
+  private static final int MIN_CONTAINER_CLASS_ID = 4;
+  private static final String FORMAT_CONTAINER_CLASS_STR = "0x0042%04d";
+  private static final String FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE =
+      "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit";
+  private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE =
+      "class del dev eth0 classid 42:%d";
+
+  private static final int TEST_CLASS_ID = 97;
+  //decimal form of 0x00420097 - when reading a classid file, it is read out
+  //as decimal
+  private static final String TEST_CLASS_ID_DECIMAL_STR = "4325527";
+
+  private Configuration conf;
+  private String tmpPath;
+
+  private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+
+  @Before
+  public void setup() {
+    privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+    conf = new YarnConfiguration();
+    tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+        ('/').append("hadoop.tmp.dir").toString();
+
+    conf.set("hadoop.tmp.dir", tmpPath);
+  }
+
+  private void verifyTrafficControlOperation(PrivilegedOperation op,
+      PrivilegedOperation.OperationType expectedOpType,
+      List<String> expectedTcCmds)
+      throws IOException {
+    //Verify that the optype matches
+    Assert.assertEquals(expectedOpType, op.getOperationType());
+
+    List<String> args = op.getArguments();
+
+    //Verify that arg count is always 1 (tc command file) for a tc operation
+    Assert.assertEquals(1, args.size());
+
+    File tcCmdsFile = new File(args.get(0));
+
+    //Verify that command file exists
+    Assert.assertTrue(tcCmdsFile.exists());
+
+    List<String> tcCmds = Files.readAllLines(tcCmdsFile.toPath(),
+        Charset.forName("UTF-8"));
+
+    //Verify that the number of commands is the same as expected and verify
+    //that each command is the same, in sequence
+    Assert.assertEquals(expectedTcCmds.size(), tcCmds.size());
+    for (int i = 0; i < tcCmds.size(); ++i) {
+      Assert.assertEquals(expectedTcCmds.get(i), tcCmds.get(i));
+    }
+  }
+
+  @Test
+  public void testBootstrapRecoveryDisabled() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      //NM_RECOVERY_DISABLED - so we expect two privileged operation executions
+      //one for wiping tc state - a second for initializing state
+      verify(privilegedOperationExecutorMock, times(2))
+          .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+      //Now verify that the two operations were correct
+      List<PrivilegedOperation> ops = opCaptor.getAllValues();
+
+      verifyTrafficControlOperation(ops.get(0),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(WIPE_STATE_CMD));
+
+      verifyTrafficControlOperation(ops.get(1),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+              ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+    } catch (ResourceHandlerException | PrivilegedOperationException |
+        IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testBootstrapRecoveryEnabled() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    try {
+      //Return a default tc state when attempting to read state
+      when(privilegedOperationExecutorMock.executePrivilegedOperation(
+          any(PrivilegedOperation.class), eq(true)))
+          .thenReturn(DEFAULT_TC_STATE_EXAMPLE);
+
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      ArgumentCaptor<PrivilegedOperation> readOpCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      //NM_RECOVERY_ENABLED - so we expect three privileged operation executions
+      //1) read tc state 2) wipe tc state 3) init tc state
+      //one for wiping tc state - a second for initializing state
+      //First, verify read op
+      verify(privilegedOperationExecutorMock, times(1))
+          .executePrivilegedOperation(readOpCaptor.capture(), eq(true));
+      List<PrivilegedOperation> readOps = readOpCaptor.getAllValues();
+      verifyTrafficControlOperation(readOps.get(0),
+          PrivilegedOperation.OperationType.TC_READ_STATE,
+          Arrays.asList(READ_QDISC_CMD, READ_FILTER_CMD, READ_CLASS_CMD));
+
+      ArgumentCaptor<PrivilegedOperation> writeOpCaptor = ArgumentCaptor
+          .forClass(PrivilegedOperation.class);
+      verify(privilegedOperationExecutorMock, times(2))
+          .executePrivilegedOperation(writeOpCaptor.capture(), eq(false));
+      //Now verify that the two write operations were correct
+      List<PrivilegedOperation> writeOps = writeOpCaptor.getAllValues();
+      verifyTrafficControlOperation(writeOps.get(0),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(WIPE_STATE_CMD));
+
+      verifyTrafficControlOperation(writeOps.get(1),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+              ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+    } catch (ResourceHandlerException | PrivilegedOperationException |
+        IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testInvalidBuilder() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      try {
+        //Invalid op type for TC batch builder
+        TrafficController.BatchBuilder invalidBuilder = trafficController.
+            new BatchBuilder(
+            PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP);
+        Assert.fail("Invalid builder check failed!");
+      } catch (ResourceHandlerException e) {
+        //expected
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testClassIdFileContentParsing() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    //Verify that classid file contents are parsed correctly
+    //This call strips the QDISC prefix and returns the classid asociated with
+    //the container
+    int parsedClassId = trafficController.getClassIdFromFileContents
+        (TEST_CLASS_ID_DECIMAL_STR);
+
+    Assert.assertEquals(TEST_CLASS_ID, parsedClassId);
+  }
+
+  @Test
+  public void testContainerOperations() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      int classId = trafficController.getNextClassId();
+
+      Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID);
+      Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId),
+          trafficController.getStringForNetClsClassId(classId));
+
+      //Verify that the operation is setup correctly with strictMode = false
+      TrafficController.BatchBuilder builder = trafficController.
+          new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false);
+      PrivilegedOperation addClassOp = builder.commitBatchToTempFile();
+
+      String expectedAddClassCmd = String.format
+          (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, YARN_BANDWIDTH_MBIT);
+      verifyTrafficControlOperation(addClassOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedAddClassCmd));
+
+      //Verify that the operation is setup correctly with strictMode = true
+      TrafficController.BatchBuilder strictModeBuilder = trafficController.
+          new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true);
+      PrivilegedOperation addClassStrictModeOp = strictModeBuilder
+          .commitBatchToTempFile();
+
+      String expectedAddClassStrictModeCmd = String.format
+          (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId,
+              CONTAINER_BANDWIDTH_MBIT);
+      verifyTrafficControlOperation(addClassStrictModeOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedAddClassStrictModeCmd));
+
+      TrafficController.BatchBuilder deleteBuilder = trafficController.new
+          BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .deleteContainerClass(classId);
+      PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile();
+
+      String expectedDeleteClassCmd = String.format
+          (FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE, classId);
+      verifyTrafficControlOperation(deleteClassOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedDeleteClassCmd));
+    } catch (ResourceHandlerException | IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @After
+  public void teardown() {
+    FileUtil.fullyDelete(new File(tmpPath));
+  }
+}