You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/04/23 02:36:32 UTC
hadoop git commit: YARN-3366. Enhanced NodeManager to support
classifying/shaping outgoing network bandwidth traffic originating from YARN
containers Contributed by Sidharta Seethana.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0ebe84d30 -> a100be685
YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a100be68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a100be68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a100be68
Branch: refs/heads/trunk
Commit: a100be685cc4521e9949589948219231aa5d2733
Parents: 0ebe84d
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed Apr 22 17:26:13 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Apr 22 17:26:13 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 4 +
.../hadoop/yarn/conf/YarnConfiguration.java | 38 +-
.../nodemanager/LinuxContainerExecutor.java | 100 ++-
.../OutboundBandwidthResourceHandler.java | 29 +
.../linux/resources/ResourceHandlerModule.java | 128 ++++
.../TrafficControlBandwidthHandlerImpl.java | 281 ++++++++
.../linux/resources/TrafficController.java | 650 +++++++++++++++++++
.../resources/TestResourceHandlerModule.java | 78 +++
.../TestTrafficControlBandwidthHandlerImpl.java | 231 +++++++
.../linux/resources/TestTrafficController.java | 327 ++++++++++
10 files changed, 1864 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 975db66..21ef32d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -96,6 +96,10 @@ Release 2.8.0 - UNRELEASED
YARN-3225. New parameter of CLI for decommissioning node gracefully in
RMAdmin CLI. (Devaraj K via junping_du)
+ YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing
+ network bandwidth traffic originating from YARN containers (Sidharta Seethana
+ via vinodkv)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 253ae08..a7f485d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -822,7 +822,43 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
-
+
+
+ public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
+
+ /** This setting controls if resource handling for network bandwidth is enabled **/
+ /* Work in progress: This configuration parameter may be changed/removed in the future */
+ @Private
+ public static final String NM_NETWORK_RESOURCE_ENABLED =
+ NM_NETWORK_RESOURCE_PREFIX + "enabled";
+ /** Network as a resource is disabled by default **/
+ @Private
+ public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
+
+ /** Specifies the interface to be used for applying network throttling rules **/
+ /* Work in progress: This configuration parameter may be changed/removed in the future */
+ @Private
+ public static final String NM_NETWORK_RESOURCE_INTERFACE =
+ NM_NETWORK_RESOURCE_PREFIX + "interface";
+ @Private
+ public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
+
+ /** Specifies the total available outbound bandwidth on the node **/
+ /* Work in progress: This configuration parameter may be changed/removed in the future */
+ @Private
+ public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
+ NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
+ @Private
+ public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
+
+ /** Specifies the total outbound bandwidth available to YARN containers. defaults to
+ * NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
+ */
+ /* Work in progress: This configuration parameter may be changed/removed in the future */
+ @Private
+ public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
+ NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
+
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index d6e6894..f8da958 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -43,7 +43,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -60,6 +66,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0;
private boolean containerLimitUsers;
+ private ResourceHandler resourceHandlerChain;
@Override
public void setConf(Configuration conf) {
@@ -189,7 +196,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
throw new IOException("Linux container executor not configured properly"
+ " (error=" + exitCode + ")", e);
}
-
+
+ try {
+ Configuration conf = super.getConf();
+
+ resourceHandlerChain = ResourceHandlerModule
+ .getConfiguredResourceHandlerChain(conf);
+ if (resourceHandlerChain != null) {
+ resourceHandlerChain.bootstrap(conf);
+ }
+ } catch (ResourceHandlerException e) {
+ LOG.error("Failed to bootstrap configured resource subsystems! ", e);
+ throw new IOException("Failed to bootstrap configured resource subsystems!");
+ }
+
resourcesHandler.init(this);
}
@@ -268,6 +288,51 @@ public class LinuxContainerExecutor extends ContainerExecutor {
container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption(
containerId);
+ String tcCommandFile = null;
+
+ try {
+ if (resourceHandlerChain != null) {
+ List<PrivilegedOperation> ops = resourceHandlerChain
+ .preStart(container);
+
+ if (ops != null) {
+ List<PrivilegedOperation> resourceOps = new ArrayList<>();
+
+ resourceOps.add(new PrivilegedOperation
+ (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ resourcesOptions));
+
+ for (PrivilegedOperation op : ops) {
+ switch (op.getOperationType()) {
+ case ADD_PID_TO_CGROUP:
+ resourceOps.add(op);
+ break;
+ case TC_MODIFY_STATE:
+ tcCommandFile = op.getArguments().get(0);
+ break;
+ default:
+ LOG.warn("PrivilegedOperation type unsupported in launch: "
+ + op.getOperationType());
+ }
+ }
+
+ if (resourceOps.size() > 1) {
+ //squash resource operations
+ try {
+ PrivilegedOperation operation = PrivilegedOperationExecutor
+ .squashCGroupOperations(resourceOps);
+ resourcesOptions = operation.getArguments().get(0);
+ } catch (PrivilegedOperationException e) {
+ LOG.error("Failed to squash cgroup operations!", e);
+ throw new ResourceHandlerException("Failed to squash cgroup operations!");
+ }
+ }
+ }
+ }
+ } catch (ResourceHandlerException e) {
+ LOG.error("ResourceHandlerChain.preStart() failed!", e);
+ throw new IOException("ResourceHandlerChain.preStart() failed!");
+ }
ShellCommandExecutor shExec = null;
@@ -286,6 +351,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs),
resourcesOptions));
+
+ if (tcCommandFile != null) {
+ command.add(tcCommandFile);
+ }
+
String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
@@ -334,6 +404,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
return exitCode;
} finally {
resourcesHandler.postExecute(containerId);
+
+ try {
+ if (resourceHandlerChain != null) {
+ resourceHandlerChain.postComplete(containerId);
+ }
+ } catch (ResourceHandlerException e) {
+ LOG.warn("ResourceHandlerChain.postComplete failed for " +
+ "containerId: " + containerId + ". Exception: " + e);
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
@@ -346,9 +425,28 @@ public class LinuxContainerExecutor extends ContainerExecutor {
public int reacquireContainer(String user, ContainerId containerId)
throws IOException, InterruptedException {
try {
+ //Resource handler chain needs to reacquire container state
+ //as well
+ if (resourceHandlerChain != null) {
+ try {
+ resourceHandlerChain.reacquireContainer(containerId);
+ } catch (ResourceHandlerException e) {
+ LOG.warn("ResourceHandlerChain.reacquireContainer failed for " +
+ "containerId: " + containerId + " Exception: " + e);
+ }
+ }
+
return super.reacquireContainer(user, containerId);
} finally {
resourcesHandler.postExecute(containerId);
+ if (resourceHandlerChain != null) {
+ try {
+ resourceHandlerChain.postComplete(containerId);
+ } catch (ResourceHandlerException e) {
+ LOG.warn("ResourceHandlerChain.postComplete failed for " +
+ "containerId: " + containerId + " Exception: " + e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
new file mode 100644
index 0000000..c814c89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
@@ -0,0 +1,29 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface OutboundBandwidthResourceHandler extends ResourceHandler {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
new file mode 100644
index 0000000..30fc951
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -0,0 +1,128 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides mechanisms to get various resource handlers - cpu, memory, network,
+ * disk etc., - based on configuration
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ResourceHandlerModule {
+ private volatile static ResourceHandlerChain resourceHandlerChain;
+
+ /**
+ * This specific implementation might provide resource management as well
+ * as resource metrics functionality. We need to ensure that the same
+ * instance is used for both.
+ */
+ private volatile static TrafficControlBandwidthHandlerImpl
+ trafficControlBandwidthHandler;
+ private volatile static CGroupsHandler cGroupsHandler;
+
+ /**
+ * Returns an initialized, thread-safe CGroupsHandler instance
+ */
+ public static CGroupsHandler getCGroupsHandler(Configuration conf)
+ throws ResourceHandlerException {
+ if (cGroupsHandler == null) {
+ synchronized (CGroupsHandler.class) {
+ if (cGroupsHandler == null) {
+ cGroupsHandler = new CGroupsHandlerImpl(conf,
+ PrivilegedOperationExecutor.getInstance(conf));
+ }
+ }
+ }
+
+ return cGroupsHandler;
+ }
+
+ private static TrafficControlBandwidthHandlerImpl
+ getTrafficControlBandwidthHandler(Configuration conf)
+ throws ResourceHandlerException {
+ if (conf.getBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+ YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_ENABLED)) {
+ if (trafficControlBandwidthHandler == null) {
+ synchronized (OutboundBandwidthResourceHandler.class) {
+ if (trafficControlBandwidthHandler == null) {
+ trafficControlBandwidthHandler = new
+ TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+ .getInstance(conf), getCGroupsHandler(conf),
+ new TrafficController(conf, PrivilegedOperationExecutor
+ .getInstance(conf)));
+ }
+ }
+ }
+
+ return trafficControlBandwidthHandler;
+ } else {
+ return null;
+ }
+ }
+
+ public static OutboundBandwidthResourceHandler
+ getOutboundBandwidthResourceHandler(Configuration conf)
+ throws ResourceHandlerException {
+ return getTrafficControlBandwidthHandler(conf);
+ }
+
+ private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
+ ResourceHandler handler) {
+ if (handler != null) {
+ handlerList.add(handler);
+ }
+ }
+
+ private static void initializeConfiguredResourceHandlerChain(
+ Configuration conf) throws ResourceHandlerException {
+ ArrayList<ResourceHandler> handlerList = new ArrayList<>();
+
+ addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
+ resourceHandlerChain = new ResourceHandlerChain(handlerList);
+ }
+
+ public static ResourceHandlerChain getConfiguredResourceHandlerChain
+ (Configuration conf) throws ResourceHandlerException {
+ if (resourceHandlerChain == null) {
+ synchronized (ResourceHandlerModule.class) {
+ if (resourceHandlerChain == null) {
+ initializeConfiguredResourceHandlerChain(conf);
+ }
+ }
+ }
+
+ if (resourceHandlerChain.getResourceHandlerList().size() != 0) {
+ return resourceHandlerChain;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
new file mode 100644
index 0000000..a0327a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
@@ -0,0 +1,281 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TrafficControlBandwidthHandlerImpl
+ implements OutboundBandwidthResourceHandler {
+
+ private static final Log LOG = LogFactory
+ .getLog(TrafficControlBandwidthHandlerImpl.class);
+ //In the absence of 'scheduling' support, we'll 'infer' the guaranteed
+ //outbound bandwidth for each container based on this number. This will
+ //likely go away once we add support on the RM for this resource type.
+ private static final int MAX_CONTAINER_COUNT = 50;
+
+ private final PrivilegedOperationExecutor privilegedOperationExecutor;
+ private final CGroupsHandler cGroupsHandler;
+ private final TrafficController trafficController;
+ private final ConcurrentHashMap<ContainerId, Integer> containerIdClassIdMap;
+
+ private Configuration conf;
+ private String device;
+ private boolean strictMode;
+ private int containerBandwidthMbit;
+ private int rootBandwidthMbit;
+ private int yarnBandwidthMbit;
+
+ public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+ privilegedOperationExecutor, CGroupsHandler cGroupsHandler,
+ TrafficController trafficController) {
+ this.privilegedOperationExecutor = privilegedOperationExecutor;
+ this.cGroupsHandler = cGroupsHandler;
+ this.trafficController = trafficController;
+ this.containerIdClassIdMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls
+ * controller and bootstraps a traffic control bandwidth shaping hierarchy
+ * @param configuration yarn configuration in use
+ * @return (potentially empty) list of privileged operations to execute.
+ * @throws ResourceHandlerException
+ */
+
+ @Override
+ public List<PrivilegedOperation> bootstrap(Configuration configuration)
+ throws ResourceHandlerException {
+ conf = configuration;
+ //We'll do this inline for the time being - since this is a one time
+ //operation. At some point, LCE code can be refactored to batch mount
+ //operations across multiple controllers - cpu, net_cls, blkio etc
+ cGroupsHandler
+ .mountCGroupController(CGroupsHandler.CGroupController.NET_CLS);
+ device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE,
+ YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE);
+ strictMode = configuration.getBoolean(YarnConfiguration
+ .NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration
+ .DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
+ rootBandwidthMbit = conf.getInt(YarnConfiguration
+ .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration
+ .DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT);
+ yarnBandwidthMbit = conf.getInt(YarnConfiguration
+ .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit);
+ containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit /
+ MAX_CONTAINER_COUNT);
+
+ StringBuffer logLine = new StringBuffer("strict mode is set to :")
+ .append(strictMode).append(System.lineSeparator());
+
+ if (strictMode) {
+ logLine.append("container bandwidth will be capped to soft limit.")
+ .append(System.lineSeparator());
+ } else {
+ logLine.append(
+ "containers will be allowed to use spare YARN bandwidth.")
+ .append(System.lineSeparator());
+ }
+
+ logLine
+ .append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ")
+ .append(containerBandwidthMbit);
+
+ LOG.info(logLine);
+ trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit);
+
+ return null;
+ }
+
+ /**
+ * Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created
+ * and a net_cls classid is generated and written to a cgroup file. A
+ * traffic control shaping rule is created in order to limit outbound
+ * bandwidth utilization.
+ * @param container Container being launched
+ * @return privileged operations for some cgroups/tc operations.
+ * @throws ResourceHandlerException
+ */
+ @Override
+ public List<PrivilegedOperation> preStart(Container container)
+ throws ResourceHandlerException {
+ String containerIdStr = container.getContainerId().toString();
+ int classId = trafficController.getNextClassId();
+ String classIdStr = trafficController.getStringForNetClsClassId(classId);
+
+ cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
+ .NET_CLS,
+ containerIdStr);
+ cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
+ containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
+ containerIdClassIdMap.put(container.getContainerId(), classId);
+
+ //Now create a privileged operation in order to update the tasks file with
+ //the pid of the running container process (root of process tree). This can
+ //only be done at the time of launching the container, in a privileged
+ //executable.
+ String tasksFile = cGroupsHandler.getPathForCGroupTasks(
+ CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
+ String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
+ .append(tasksFile).toString();
+ List<PrivilegedOperation> ops = new ArrayList<>();
+
+ ops.add(new PrivilegedOperation(
+ PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
+
+ //Create a privileged operation to create a tc rule for this container
+ //We'll return this to the calling (Linux) Container Executor
+ //implementation for batching optimizations so that we don't fork/exec
+ //additional times during container launch.
+ TrafficController.BatchBuilder builder = trafficController.new
+ BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE);
+
+ builder.addContainerClass(classId, containerBandwidthMbit, strictMode);
+ ops.add(builder.commitBatchToTempFile());
+
+ return ops;
+ }
+
+ /**
+ * Reacquires state for a container - reads the classid from the cgroup
+ * being used for the container being reacquired
+ * @param containerId if of the container being reacquired.
+ * @return (potentially empty) list of privileged operations
+ * @throws ResourceHandlerException
+ */
+
+ @Override
+ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ String containerIdStr = containerId.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to reacquire classId for container: " +
+ containerIdStr);
+ }
+
+ String classIdStrFromFile = cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.NET_CLS, containerIdStr,
+ CGroupsHandler.CGROUP_PARAM_CLASSID);
+ int classId = trafficController
+ .getClassIdFromFileContents(classIdStrFromFile);
+
+ LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr
+ + " -> " + classId);
+ containerIdClassIdMap.put(containerId, classId);
+
+ return null;
+ }
+
+ /**
+ * Returns total bytes sent per container to be used for metrics tracking
+ * purposes.
+ * @return a map of containerId to bytes sent
+ * @throws ResourceHandlerException
+ */
+ public Map<ContainerId, Integer> getBytesSentPerContainer()
+ throws ResourceHandlerException {
+ Map<Integer, Integer> classIdStats = trafficController.readStats();
+ Map<ContainerId, Integer> containerIdStats = new HashMap<>();
+
+ for (Map.Entry<ContainerId, Integer> entry : containerIdClassIdMap
+ .entrySet()) {
+ ContainerId containerId = entry.getKey();
+ Integer classId = entry.getValue();
+ Integer bytesSent = classIdStats.get(classId);
+
+ if (bytesSent == null) {
+ LOG.warn("No bytes sent metric found for container: " + containerId +
+ " with classId: " + classId);
+ continue;
+ }
+ containerIdStats.put(containerId, bytesSent);
+ }
+
+ return containerIdStats;
+ }
+
+ /**
+ * Cleanup operations once container is completed - deletes cgroup and
+ * removes traffic shaping rule(s).
+ * @param containerId of the container that was completed.
+ * @return
+ * @throws ResourceHandlerException
+ */
+ @Override
+ public List<PrivilegedOperation> postComplete(ContainerId containerId)
+ throws ResourceHandlerException {
+ LOG.info("postComplete for container: " + containerId.toString());
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
+ containerId.toString());
+
+ Integer classId = containerIdClassIdMap.get(containerId);
+
+ if (classId != null) {
+ PrivilegedOperation op = trafficController.new
+ BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+ .deleteContainerClass(classId).commitBatchToTempFile();
+
+ try {
+ privilegedOperationExecutor.executePrivilegedOperation(op, false);
+ trafficController.releaseClassId(classId);
+ } catch (PrivilegedOperationException e) {
+ LOG.warn("Failed to delete tc rule for classId: " + classId);
+ throw new ResourceHandlerException(
+ "Failed to delete tc rule for classId:" + classId);
+ }
+ } else {
+ LOG.warn("Not cleaning up tc rules. classId unknown for container: " +
+ containerId.toString());
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<PrivilegedOperation> teardown()
+ throws ResourceHandlerException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("teardown(): Nothing to do");
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
new file mode 100644
index 0000000..e33cea4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
@@ -0,0 +1,650 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Wrapper around the 'tc' tool. Provides access to a very specific subset of
+ * the functionality provided by the tc tool.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable class TrafficController {
+ private static final Log LOG = LogFactory.getLog(TrafficController.class);
+ private static final int ROOT_QDISC_HANDLE = 42;
+ private static final int ZERO_CLASS_ID = 0;
+ private static final int ROOT_CLASS_ID = 1;
+ /** Traffic shaping class used for all unclassified traffic */
+ private static final int DEFAULT_CLASS_ID = 2;
+ /** Traffic shaping class used for all YARN traffic */
+ private static final int YARN_ROOT_CLASS_ID = 3;
+ /** Classes 0-3 are used already. We need to ensure that container classes
+ * do not collide with these classids.
+ */
+ private static final int MIN_CONTAINER_CLASS_ID = 4;
+ /** This is the number of distinct (container) traffic shaping classes
+ * that are supported */
+ private static final int MAX_CONTAINER_CLASSES = 1024;
+
+ private static final String MBIT_SUFFIX = "mbit";
+ private static final String TMP_FILE_PREFIX = "tc.";
+ private static final String TMP_FILE_SUFFIX = ".cmds";
+
+ /** Root queuing discipline attached to the root of the interface */
+ private static final String FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT =
+ "qdisc add dev %s root handle %d: htb default %s";
+ /** Specifies a cgroup/classid based filter - based on the classid associated
+ * with the outbound packet, the corresponding traffic shaping rule is used
+ * . Please see tc documentation for additional details.
+ */
+ private static final String FORMAT_FILTER_CGROUP_ADD_TO_PARENT =
+ "filter add dev %s parent %d: protocol ip prio 10 handle 1: cgroup";
+ /** Standard format for adding a traffic shaping class to a parent, with
+ * the specified bandwidth limits
+ */
+ private static final String FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES =
+ "class add dev %s parent %d:%d classid %d:%d htb rate %s ceil %s";
+ /** Standard format to delete a traffic shaping class */
+ private static final String FORMAT_DELETE_CLASS =
+ "class del dev %s classid %d:%d";
+ /** Format of the classid that is to be used with the net_cls cgroup. Needs
+ * to be of the form 0xAAAABBBB */
+ private static final String FORMAT_NET_CLS_CLASS_ID = "0x%04d%04d";
+ /** Commands to read the qdsic(s)/filter(s)/class(es) associated with an
+ * interface
+ */
+ private static final String FORMAT_READ_STATE =
+ "qdisc show dev %1$s%n" +
+ "filter show dev %1$s%n" +
+ "class show dev %1$s";
+ private static final String FORMAT_READ_CLASSES = "class show dev %s";
+ /** Delete a qdisc and all its children - classes/filters etc */
+ private static final String FORMAT_WIPE_STATE =
+ "qdisc del dev %s parent root";
+
+ private final Configuration conf;
+ //Used to store the set of classids in use for container classes
+ private final BitSet classIdSet;
+ private final PrivilegedOperationExecutor privilegedOperationExecutor;
+
+ private String tmpDirPath;
+ private String device;
+ private int rootBandwidthMbit;
+ private int yarnBandwidthMbit;
+ private int defaultClassBandwidthMbit;
+
+ TrafficController(Configuration conf, PrivilegedOperationExecutor exec) {
+ this.conf = conf;
+ this.classIdSet = new BitSet(MAX_CONTAINER_CLASSES);
+ this.privilegedOperationExecutor = exec;
+ }
+
+ /**
+ * Bootstrap tc configuration
+ */
+ public void bootstrap(String device, int rootBandwidthMbit, int
+ yarnBandwidthMbit)
+ throws ResourceHandlerException {
+ if (device == null) {
+ throw new ResourceHandlerException("device cannot be null!");
+ }
+
+ String tmpDirBase = conf.get("hadoop.tmp.dir");
+ if (tmpDirBase == null) {
+ throw new ResourceHandlerException("hadoop.tmp.dir not set!");
+ }
+ tmpDirPath = tmpDirBase + "/nm-tc-rules";
+
+ File tmpDir = new File(tmpDirPath);
+ if (!(tmpDir.exists() || tmpDir.mkdirs())) {
+ LOG.warn("Unable to create directory: " + tmpDirPath);
+ throw new ResourceHandlerException("Unable to create directory: " +
+ tmpDirPath);
+ }
+
+ this.device = device;
+ this.rootBandwidthMbit = rootBandwidthMbit;
+ this.yarnBandwidthMbit = yarnBandwidthMbit;
+ defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0
+ ? rootBandwidthMbit : (rootBandwidthMbit - yarnBandwidthMbit);
+
+ boolean recoveryEnabled = conf.getBoolean(YarnConfiguration
+ .NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
+ String state = null;
+
+ if (!recoveryEnabled) {
+ LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding.");
+ } else {
+ //NM recovery enabled - run a state check
+ state = readState();
+ if (checkIfAlreadyBootstrapped(state)) {
+ LOG.info("TC configuration is already in place. Not wiping state.");
+
+ //We already have the list of existing container classes, if any
+ //that were created after bootstrapping
+ reacquireContainerClasses(state);
+ return;
+ } else {
+ LOG.info("TC configuration is incomplete. Wiping tc state before proceeding");
+ }
+ }
+
+ wipeState(); //start over in case preview bootstrap was incomplete
+ initializeState();
+ }
+
+ private void initializeState() throws ResourceHandlerException {
+ LOG.info("Initializing tc state.");
+
+ BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+ OperationType.TC_MODIFY_STATE)
+ .addRootQDisc()
+ .addCGroupFilter()
+ .addClassToRootQDisc(rootBandwidthMbit)
+ .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit)
+ //yarn bandwidth is capped with rate = ceil
+ .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit);
+ PrivilegedOperation op = builder.commitBatchToTempFile();
+
+ try {
+ privilegedOperationExecutor.executePrivilegedOperation(op, false);
+ } catch (PrivilegedOperationException e) {
+ LOG.warn("Failed to bootstrap outbound bandwidth configuration");
+
+ throw new ResourceHandlerException(
+ "Failed to bootstrap outbound bandwidth configuration", e);
+ }
+ }
+
+ /**
+ * Function to check if the interface in use has already been fully
+ * bootstrapped with the required tc configuration
+ *
+ * @return boolean indicating the result of the check
+ */
+ private boolean checkIfAlreadyBootstrapped(String state)
+ throws ResourceHandlerException {
+ List<String> regexes = new ArrayList<>();
+
+ //root qdisc
+ regexes.add(String.format("^qdisc htb %d: root(.)*$",
+ ROOT_QDISC_HANDLE));
+ //cgroup filter
+ regexes.add(String.format("^filter parent %d: protocol ip " +
+ "(.)*cgroup(.)*$", ROOT_QDISC_HANDLE));
+ //root, default and yarn classes
+ regexes.add(String.format("^class htb %d:%d root(.)*$",
+ ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+ regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+ ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+ regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+ ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE,
+ ROOT_CLASS_ID));
+
+ for (String regex : regexes) {
+ Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
+
+ if (pattern.matcher(state).find()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Matched regex: " + regex);
+ }
+ } else {
+ String logLine = new StringBuffer("Failed to match regex: ")
+ .append(regex).append(" Current state: ").append(state).toString();
+ LOG.warn(logLine);
+ return false;
+ }
+ }
+
+ LOG.info("Bootstrap check succeeded");
+
+ return true;
+ }
+
+ private String readState() throws ResourceHandlerException {
+ //Sample state output:
+ // qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0
+ // filter parent 42: protocol ip pref 10 cgroup handle 0x1
+ //
+ // filter parent 42: protocol ip pref 10 cgroup handle 0x1
+ //
+ // class htb 42:1 root rate 10000Kbit ceil 10000Kbit burst 1600b cburst 1600b
+ // class htb 42:2 parent 42:1 prio 0 rate 3000Kbit ceil 10000Kbit burst 1599b cburst 1600b
+ // class htb 42:3 parent 42:1 prio 0 rate 7000Kbit ceil 7000Kbit burst 1598b cburst 1598b
+
+ BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+ OperationType.TC_READ_STATE)
+ .readState();
+ PrivilegedOperation op = builder.commitBatchToTempFile();
+
+ try {
+ String output =
+ privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TC state: %n" + output);
+ }
+
+ return output;
+ } catch (PrivilegedOperationException e) {
+ LOG.warn("Failed to bootstrap outbound bandwidth rules");
+ throw new ResourceHandlerException(
+ "Failed to bootstrap outbound bandwidth rules", e);
+ }
+ }
+
+ private void wipeState() throws ResourceHandlerException {
+ BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+ OperationType.TC_MODIFY_STATE)
+ .wipeState();
+ PrivilegedOperation op = builder.commitBatchToTempFile();
+
+ try {
+ LOG.info("Wiping tc state.");
+ privilegedOperationExecutor.executePrivilegedOperation(op, false);
+ } catch (PrivilegedOperationException e) {
+ LOG.warn("Failed to wipe tc state. This could happen if the interface" +
+ " is already in its default state. Ignoring.");
+ //Ignoring this exception. This could happen if the interface is already
+ //in its default state. For this reason we don't throw a
+ //ResourceHandlerException here.
+ }
+ }
+
+ /**
+ * Parses the current state looks for classids already in use
+ */
+ private void reacquireContainerClasses(String state) {
+ //At this point we already have already successfully passed
+ //checkIfAlreadyBootstrapped() - so we know that at least the
+ //root classes are in place.
+ String tcClassesStr = state.substring(state.indexOf("class"));
+ //one class per line - the results of the split will need to trimmed
+ String[] tcClasses = Pattern.compile("$", Pattern.MULTILINE)
+ .split(tcClassesStr);
+ Pattern tcClassPattern = Pattern.compile(String.format(
+ "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+
+ synchronized (classIdSet) {
+ for (String tcClassSplit : tcClasses) {
+ String tcClass = tcClassSplit.trim();
+
+ if (!tcClass.isEmpty()) {
+ Matcher classMatcher = tcClassPattern.matcher(tcClass);
+ if (classMatcher.matches()) {
+ int classId = Integer.parseInt(classMatcher.group(1));
+ if (classId >= MIN_CONTAINER_CLASS_ID) {
+ classIdSet.set(classId - MIN_CONTAINER_CLASS_ID);
+ LOG.info("Reacquired container classid: " + classId);
+ }
+ } else {
+ LOG.warn("Unable to match classid in string:" + tcClass);
+ }
+ }
+ }
+ }
+ }
+
+ public Map<Integer, Integer> readStats() throws ResourceHandlerException {
+ BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+ OperationType.TC_READ_STATS)
+ .readClasses();
+ PrivilegedOperation op = builder.commitBatchToTempFile();
+
+ try {
+ String output =
+ privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TC stats output:" + output);
+ }
+
+ Map<Integer, Integer> classIdBytesStats = parseStatsString(output);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("classId -> bytes sent %n" + classIdBytesStats);
+ }
+
+ return classIdBytesStats;
+ } catch (PrivilegedOperationException e) {
+ LOG.warn("Failed to get tc stats");
+ throw new ResourceHandlerException("Failed to get tc stats", e);
+ }
+ }
+
+ private Map<Integer, Integer> parseStatsString(String stats) {
+ //Example class stats segment (multiple present in tc output)
+ // class htb 42:4 parent 42:3 prio 0 rate 1000Kbit ceil 7000Kbit burst1600b cburst 1598b
+ // Sent 77921300 bytes 52617 pkt (dropped 0, overlimits 0 requeues 0)
+ // rate 6973Kbit 589pps backlog 0b 39p requeues 0
+ // lended: 3753 borrowed: 22514 giants: 0
+ // tokens: -122164 ctokens: -52488
+
+ String[] lines = Pattern.compile("$", Pattern.MULTILINE)
+ .split(stats);
+ Pattern tcClassPattern = Pattern.compile(String.format(
+ "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+ Pattern bytesPattern = Pattern.compile("Sent (\\d+) bytes.*");
+
+ int currentClassId = -1;
+ Map<Integer, Integer> containerClassIdStats = new HashMap<>();
+
+ for (String lineSplit : lines) {
+ String line = lineSplit.trim();
+
+ if (!line.isEmpty()) {
+ //Check if we encountered a stats segment for a container class
+ Matcher classMatcher = tcClassPattern.matcher(line);
+ if (classMatcher.matches()) {
+ int classId = Integer.parseInt(classMatcher.group(1));
+ if (classId >= MIN_CONTAINER_CLASS_ID) {
+ currentClassId = classId;
+ continue;
+ }
+ }
+
+ //Check if we encountered a stats line
+ Matcher bytesMatcher = bytesPattern.matcher(line);
+ if (bytesMatcher.matches()) {
+ //we found at least one class segment
+ if (currentClassId != -1) {
+ int bytes = Integer.parseInt(bytesMatcher.group(1));
+ containerClassIdStats.put(currentClassId, bytes);
+ } else {
+ LOG.warn("Matched a 'bytes sent' line outside of a class stats " +
+ "segment : " + line);
+ }
+ continue;
+ }
+
+ //skip other kinds of non-empty lines - since we aren't interested in
+ //them.
+ }
+ }
+
+ return containerClassIdStats;
+ }
+
+ /**
+ * Returns a formatted string for attaching a qdisc to the root of the
+ * device/interface. Additional qdisc
+ * parameters can be supplied - for example, the default 'class' to use for
+ * incoming packets
+ */
+ private String getStringForAddRootQDisc() {
+ return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device,
+ ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID);
+ }
+
+ /**
+ * Returns a formatted string for a filter that matches packets based on the
+ * presence of net_cls classids
+ */
+ private String getStringForaAddCGroupFilter() {
+ return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device,
+ ROOT_QDISC_HANDLE);
+ }
+
+ /**
+ * Get the next available classid. This has to be released post container
+ * complete
+ */
+ public int getNextClassId() throws ResourceHandlerException {
+ synchronized (classIdSet) {
+ int index = classIdSet.nextClearBit(0);
+ if (index >= MAX_CONTAINER_CLASSES) {
+ throw new ResourceHandlerException("Reached max container classes: "
+ + MAX_CONTAINER_CLASSES);
+ }
+ classIdSet.set(index);
+ return (index + MIN_CONTAINER_CLASS_ID);
+ }
+ }
+
+ public void releaseClassId(int classId) throws ResourceHandlerException {
+ synchronized (classIdSet) {
+ int index = classId - MIN_CONTAINER_CLASS_ID;
+ if (index < 0 || index >= MAX_CONTAINER_CLASSES) {
+ throw new ResourceHandlerException("Invalid incoming classId: "
+ + classId);
+ }
+ classIdSet.clear(index);
+ }
+ }
+
+ /**
+ * Returns a formatted string representing the given classId including a
+ * handle
+ */
+ public String getStringForNetClsClassId(int classId) {
+ return String.format(FORMAT_NET_CLS_CLASS_ID, ROOT_QDISC_HANDLE, classId);
+ }
+
+ /**
+ * A value read out of net_cls.classid file is in decimal form. We need to
+ * convert to 32-bit/8 digit hex, extract the lower 16-bit/four digits
+ * as an int
+ */
+ public int getClassIdFromFileContents(String input) {
+ //convert from decimal back to fixed size hex form
+ //e.g 4325381 -> 00420005
+ String classIdStr = String.format("%08x", Integer.parseInt(input));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ClassId hex string : " + classIdStr);
+ }
+
+ //extract and return 4 digits
+ //e.g 00420005 -> 0005
+ return Integer.parseInt(classIdStr.substring(4));
+ }
+
+ /**
+ * Adds a tc class to qdisc at root
+ */
+ private String getStringForAddClassToRootQDisc(int rateMbit) {
+ String rateMbitStr = rateMbit + MBIT_SUFFIX;
+ //example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit
+ // ceil 1000mbit"
+ return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+ ROOT_QDISC_HANDLE, ZERO_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID,
+ rateMbitStr, rateMbitStr);
+ }
+
+ private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) {
+ String rateMbitStr = rateMbit + MBIT_SUFFIX;
+ String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+ //example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit
+ // ceil 1000mbit"
+ return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+ ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID,
+ rateMbitStr, ceilMbitStr);
+ }
+
+ private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) {
+ String rateMbitStr = rateMbit + MBIT_SUFFIX;
+ String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+ //example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit
+ // ceil 1000mbit"
+ return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+ ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID,
+ rateMbitStr, ceilMbitStr);
+ }
+
+ private String getStringForAddContainerClass(int classId, int rateMbit, int
+ ceilMbit) {
+ String rateMbitStr = rateMbit + MBIT_SUFFIX;
+ String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+ //example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit
+ // ceil 700mbit"
+ return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+ ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, classId,
+ rateMbitStr, ceilMbitStr);
+ }
+
+ private String getStringForDeleteContainerClass(int classId) {
+ //example "class del dev eth0 classid 42:7"
+ return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE,
+ classId);
+ }
+
+ private String getStringForReadState() {
+ return String.format(FORMAT_READ_STATE, device);
+ }
+
+ private String getStringForReadClasses() {
+ return String.format(FORMAT_READ_CLASSES, device);
+ }
+
+ private String getStringForWipeState() {
+ return String.format(FORMAT_WIPE_STATE, device);
+ }
+
+ public class BatchBuilder {
+ final PrivilegedOperation operation;
+ final List<String> commands;
+
+ public BatchBuilder(PrivilegedOperation.OperationType opType)
+ throws ResourceHandlerException {
+ switch (opType) {
+ case TC_MODIFY_STATE:
+ case TC_READ_STATE:
+ case TC_READ_STATS:
+ operation = new PrivilegedOperation(opType, (String) null);
+ commands = new ArrayList<>();
+ break;
+ default:
+ throw new ResourceHandlerException("Not a tc operation type : " +
+ opType);
+ }
+ }
+
+ private BatchBuilder addRootQDisc() {
+ commands.add(getStringForAddRootQDisc());
+ return this;
+ }
+
+ private BatchBuilder addCGroupFilter() {
+ commands.add(getStringForaAddCGroupFilter());
+ return this;
+ }
+
+ private BatchBuilder addClassToRootQDisc(int rateMbit) {
+ commands.add(getStringForAddClassToRootQDisc(rateMbit));
+ return this;
+ }
+
+ private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) {
+ commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit));
+ return this;
+ }
+
+ private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) {
+ commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit));
+ return this;
+ }
+
+ public BatchBuilder addContainerClass(int classId, int rateMbit, boolean
+ strictMode) {
+ int ceilMbit;
+
+ if (strictMode) {
+ ceilMbit = rateMbit;
+ } else {
+ ceilMbit = yarnBandwidthMbit;
+ }
+
+ commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit));
+ return this;
+ }
+
+ public BatchBuilder deleteContainerClass(int classId) {
+ commands.add(getStringForDeleteContainerClass(classId));
+ return this;
+ }
+
+ private BatchBuilder readState() {
+ commands.add(getStringForReadState());
+ return this;
+ }
+
+ //We'll read all classes, but use a different tc operation type
+ //when reading stats for all these classes. Stats are fetched using a
+ //different tc cli option (-s).
+
+ private BatchBuilder readClasses() {
+ //We'll read all classes, but use a different tc operation type
+ //for reading stats for all these classes. Stats are fetched using a
+ //different tc cli option (-s).
+ commands.add(getStringForReadClasses());
+ return this;
+ }
+
+ private BatchBuilder wipeState() {
+ commands.add(getStringForWipeState());
+ return this;
+ }
+
+ public PrivilegedOperation commitBatchToTempFile()
+ throws ResourceHandlerException {
+ try {
+ File tcCmds = File.createTempFile(TMP_FILE_PREFIX, TMP_FILE_SUFFIX, new
+ File(tmpDirPath));
+ Writer writer = new OutputStreamWriter(new FileOutputStream(tcCmds),
+ "UTF-8");
+ PrintWriter printWriter = new PrintWriter(writer);
+
+ for (String command : commands) {
+ printWriter.println(command);
+ }
+
+ printWriter.close();
+ operation.appendArgs(tcCmds.getAbsolutePath());
+
+ return operation;
+ } catch (IOException e) {
+ LOG.warn("Failed to create or write to temporary file in dir: " +
+ tmpDirPath);
+ throw new ResourceHandlerException(
+ "Failed to create or write to temporary file in dir: "
+ + tmpDirPath);
+ }
+ }
+ } //end BatchBuilder
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
new file mode 100644
index 0000000..939dfe7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
@@ -0,0 +1,78 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestResourceHandlerModule {
+ private static final Log LOG = LogFactory.
+ getLog(TestResourceHandlerModule.class);
+ Configuration emptyConf;
+ Configuration networkEnabledConf;
+
+ @Before
+ public void setup() {
+ emptyConf = new YarnConfiguration();
+ networkEnabledConf = new YarnConfiguration();
+
+ networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+ true);
+ //We need to bypass mtab parsing for figuring out cgroups mount locations
+ networkEnabledConf.setBoolean(YarnConfiguration
+ .NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
+ }
+
+ @Test
+ public void testOutboundBandwidthHandler() {
+ try {
+ //This resourceHandler should be non-null only if network as a resource
+ //is explicitly enabled
+ OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule
+ .getOutboundBandwidthResourceHandler(emptyConf);
+ Assert.assertNull(resourceHandler);
+
+ //When network as a resource is enabled this should be non-null
+ resourceHandler = ResourceHandlerModule
+ .getOutboundBandwidthResourceHandler(networkEnabledConf);
+ Assert.assertNotNull(resourceHandler);
+
+ //Ensure that outbound bandwidth resource handler is present in the chain
+ ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule
+ .getConfiguredResourceHandlerChain(networkEnabledConf);
+ List<ResourceHandler> resourceHandlers = resourceHandlerChain
+ .getResourceHandlerList();
+ //Exactly one resource handler in chain
+ Assert.assertEquals(resourceHandlers.size(), 1);
+ //Same instance is expected to be in the chain.
+ Assert.assertTrue(resourceHandlers.get(0) == resourceHandler);
+ } catch (ResourceHandlerException e) {
+ Assert.fail("Unexpected ResourceHandlerException: " + e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
new file mode 100644
index 0000000..50ad6b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
@@ -0,0 +1,231 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.util.List;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficControlBandwidthHandlerImpl {
+ private static final Log LOG =
+ LogFactory.getLog(TestTrafficControlBandwidthHandlerImpl.class);
+ private static final int ROOT_BANDWIDTH_MBIT = 100;
+ private static final int YARN_BANDWIDTH_MBIT = 70;
+ private static final int TEST_CLASSID = 100;
+ private static final String TEST_CLASSID_STR = "42:100";
+ private static final String TEST_CONTAINER_ID_STR = "container_01";
+ private static final String TEST_TASKS_FILE = "testTasksFile";
+
+ private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+ private CGroupsHandler cGroupsHandlerMock;
+ private TrafficController trafficControllerMock;
+ private Configuration conf;
+ private String tmpPath;
+ private String device;
+ ContainerId containerIdMock;
+ Container containerMock;
+
+ @Before
+ public void setup() {
+ privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+ cGroupsHandlerMock = mock(CGroupsHandler.class);
+ trafficControllerMock = mock(TrafficController.class);
+ conf = new YarnConfiguration();
+ tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+ ('/').append("hadoop.tmp.dir").toString();
+ device = YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE;
+ containerIdMock = mock(ContainerId.class);
+ containerMock = mock(Container.class);
+ when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR);
+ //mock returning a mock - an angel died somewhere.
+ when(containerMock.getContainerId()).thenReturn(containerIdMock);
+
+ conf.setInt(YarnConfiguration
+ .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, ROOT_BANDWIDTH_MBIT);
+ conf.setInt(YarnConfiguration
+ .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, YARN_BANDWIDTH_MBIT);
+ conf.set("hadoop.tmp.dir", tmpPath);
+ //In these tests, we'll only use TrafficController with recovery disabled
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+ }
+
+ @Test
+ public void testBootstrap() {
+ TrafficControlBandwidthHandlerImpl handlerImpl = new
+ TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+ cGroupsHandlerMock, trafficControllerMock);
+
+ try {
+ handlerImpl.bootstrap(conf);
+ verify(cGroupsHandlerMock).mountCGroupController(
+ eq(CGroupsHandler.CGroupController.NET_CLS));
+ verifyNoMoreInteractions(cGroupsHandlerMock);
+ verify(trafficControllerMock).bootstrap(eq(device),
+ eq(ROOT_BANDWIDTH_MBIT),
+ eq(YARN_BANDWIDTH_MBIT));
+ verifyNoMoreInteractions(trafficControllerMock);
+ } catch (ResourceHandlerException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected ResourceHandlerException!");
+ }
+ }
+
+ @Test
+ public void testLifeCycle() {
+ TrafficController trafficControllerSpy = spy(new TrafficController(conf,
+ privilegedOperationExecutorMock));
+ TrafficControlBandwidthHandlerImpl handlerImpl = new
+ TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+ cGroupsHandlerMock, trafficControllerSpy);
+
+ try {
+ handlerImpl.bootstrap(conf);
+ testPreStart(trafficControllerSpy, handlerImpl);
+ testPostComplete(trafficControllerSpy, handlerImpl);
+ } catch (ResourceHandlerException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected ResourceHandlerException!");
+ }
+ }
+
+ private void testPreStart(TrafficController trafficControllerSpy,
+ TrafficControlBandwidthHandlerImpl handlerImpl) throws
+ ResourceHandlerException {
+ //This is not the cleanest of solutions - but since we are testing the
+ //preStart/postComplete lifecycle, we don't have a different way of
+ //handling this - we don't keep track of the number of invocations by
+ //a class we are not testing here (TrafficController)
+ //So, we'll reset this mock. This is not a problem with other mocks.
+ reset(privilegedOperationExecutorMock);
+
+ doReturn(TEST_CLASSID).when(trafficControllerSpy).getNextClassId();
+ doReturn(TEST_CLASSID_STR).when(trafficControllerSpy)
+ .getStringForNetClsClassId(TEST_CLASSID);
+ when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler
+ .CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn(
+ TEST_TASKS_FILE);
+
+ List<PrivilegedOperation> ops = handlerImpl.preStart(containerMock);
+
+ //Ensure that cgroups is created and updated as expected
+ verify(cGroupsHandlerMock).createCGroup(
+ eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+ verify(cGroupsHandlerMock).updateCGroupParam(
+ eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR),
+ eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID_STR));
+
+ //Now check the privileged operations being returned
+ //We expect two operations - one for adding pid to tasks file and another
+ //for a tc modify operation
+ Assert.assertEquals(2, ops.size());
+
+ //Verify that the add pid op is correct
+ PrivilegedOperation addPidOp = ops.get(0);
+ String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX +
+ TEST_TASKS_FILE;
+ List<String> addPidOpArgs = addPidOp.getArguments();
+
+ Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ addPidOp.getOperationType());
+ Assert.assertEquals(1, addPidOpArgs.size());
+ Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0));
+
+ //Verify that that tc modify op is correct
+ PrivilegedOperation tcModifyOp = ops.get(1);
+ List<String> tcModifyOpArgs = tcModifyOp.getArguments();
+
+ Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ tcModifyOp.getOperationType());
+ Assert.assertEquals(1, tcModifyOpArgs.size());
+ //verify that the tc command file exists
+ Assert.assertTrue(new File(tcModifyOpArgs.get(0)).exists());
+ }
+
+ private void testPostComplete(TrafficController trafficControllerSpy,
+ TrafficControlBandwidthHandlerImpl handlerImpl) throws
+ ResourceHandlerException {
+ //This is not the cleanest of solutions - but since we are testing the
+ //preStart/postComplete lifecycle, we don't have a different way of
+ //handling this - we don't keep track of the number of invocations by
+ //a class we are not testing here (TrafficController)
+ //So, we'll reset this mock. This is not a problem with other mocks.
+ reset(privilegedOperationExecutorMock);
+
+ List<PrivilegedOperation> ops = handlerImpl.postComplete(containerIdMock);
+
+ verify(cGroupsHandlerMock).deleteCGroup(
+ eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+
+ try {
+ //capture privileged op argument and ensure it is correct
+ ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+ (PrivilegedOperation.class);
+
+ verify(privilegedOperationExecutorMock)
+ .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+ List<String> args = opCaptor.getValue().getArguments();
+
+ Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ opCaptor.getValue().getOperationType());
+ Assert.assertEquals(1, args.size());
+ //ensure that tc command file exists
+ Assert.assertTrue(new File(args.get(0)).exists());
+
+ verify(trafficControllerSpy).releaseClassId(TEST_CLASSID);
+ } catch (PrivilegedOperationException e) {
+ LOG.error("Caught exception: " + e);
+ Assert.fail("Unexpected PrivilegedOperationException from mock!");
+ }
+
+ //We don't expect any operations to be returned here
+ Assert.assertNull(ops);
+ }
+
+ @After
+ public void teardown() {
+ FileUtil.fullyDelete(new File(tmpPath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a100be68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
new file mode 100644
index 0000000..7ea7135
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java
@@ -0,0 +1,327 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficController {
+ private static final Log LOG = LogFactory.getLog(TestTrafficController.class);
+ private static final int ROOT_BANDWIDTH_MBIT = 100;
+ private static final int YARN_BANDWIDTH_MBIT = 70;
+ private static final int CONTAINER_BANDWIDTH_MBIT = 10;
+
+ //These constants are closely tied to the implementation of TrafficController
+ //and will have to be modified in tandem with any related TrafficController
+ //changes.
+ private static final String DEVICE = "eth0";
+ private static final String WIPE_STATE_CMD = "qdisc del dev eth0 parent root";
+ private static final String ADD_ROOT_QDISC_CMD =
+ "qdisc add dev eth0 root handle 42: htb default 2";
+ private static final String ADD_CGROUP_FILTER_CMD =
+ "filter add dev eth0 parent 42: protocol ip prio 10 handle 1: cgroup";
+ private static final String ADD_ROOT_CLASS_CMD =
+ "class add dev eth0 parent 42:0 classid 42:1 htb rate 100mbit ceil 100mbit";
+ private static final String ADD_DEFAULT_CLASS_CMD =
+ "class add dev eth0 parent 42:1 classid 42:2 htb rate 30mbit ceil 100mbit";
+ private static final String ADD_YARN_CLASS_CMD =
+ "class add dev eth0 parent 42:1 classid 42:3 htb rate 70mbit ceil 70mbit";
+ private static final String DEFAULT_TC_STATE_EXAMPLE =
+ "qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1";
+ private static final String READ_QDISC_CMD = "qdisc show dev eth0";
+ private static final String READ_FILTER_CMD = "filter show dev eth0";
+ private static final String READ_CLASS_CMD = "class show dev eth0";
+ private static final int MIN_CONTAINER_CLASS_ID = 4;
+ private static final String FORMAT_CONTAINER_CLASS_STR = "0x0042%04d";
+ private static final String FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE =
+ "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit";
+ private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE =
+ "class del dev eth0 classid 42:%d";
+
+ private static final int TEST_CLASS_ID = 97;
+ //decimal form of 0x00420097 - when reading a classid file, it is read out
+ //as decimal
+ private static final String TEST_CLASS_ID_DECIMAL_STR = "4325527";
+
+ private Configuration conf;
+ private String tmpPath;
+
+ private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+
+ @Before
+ public void setup() {
+ privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+ conf = new YarnConfiguration();
+ tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+ ('/').append("hadoop.tmp.dir").toString();
+
+ conf.set("hadoop.tmp.dir", tmpPath);
+ }
+
+ private void verifyTrafficControlOperation(PrivilegedOperation op,
+ PrivilegedOperation.OperationType expectedOpType,
+ List<String> expectedTcCmds)
+ throws IOException {
+ //Verify that the optype matches
+ Assert.assertEquals(expectedOpType, op.getOperationType());
+
+ List<String> args = op.getArguments();
+
+ //Verify that arg count is always 1 (tc command file) for a tc operation
+ Assert.assertEquals(1, args.size());
+
+ File tcCmdsFile = new File(args.get(0));
+
+ //Verify that command file exists
+ Assert.assertTrue(tcCmdsFile.exists());
+
+ List<String> tcCmds = Files.readAllLines(tcCmdsFile.toPath(),
+ Charset.forName("UTF-8"));
+
+ //Verify that the number of commands is the same as expected and verify
+ //that each command is the same, in sequence
+ Assert.assertEquals(expectedTcCmds.size(), tcCmds.size());
+ for (int i = 0; i < tcCmds.size(); ++i) {
+ Assert.assertEquals(expectedTcCmds.get(i), tcCmds.get(i));
+ }
+ }
+
+ @Test
+ public void testBootstrapRecoveryDisabled() {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+ TrafficController trafficController = new TrafficController(conf,
+ privilegedOperationExecutorMock);
+
+ try {
+ trafficController
+ .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+ ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+ (PrivilegedOperation.class);
+
+ //NM_RECOVERY_DISABLED - so we expect two privileged operation executions
+ //one for wiping tc state - a second for initializing state
+ verify(privilegedOperationExecutorMock, times(2))
+ .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+ //Now verify that the two operations were correct
+ List<PrivilegedOperation> ops = opCaptor.getAllValues();
+
+ verifyTrafficControlOperation(ops.get(0),
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(WIPE_STATE_CMD));
+
+ verifyTrafficControlOperation(ops.get(1),
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+ ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+ } catch (ResourceHandlerException | PrivilegedOperationException |
+ IOException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected exception: "
+ + e.getClass().getSimpleName());
+ }
+ }
+
+ @Test
+ public void testBootstrapRecoveryEnabled() {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+ TrafficController trafficController = new TrafficController(conf,
+ privilegedOperationExecutorMock);
+
+ try {
+ //Return a default tc state when attempting to read state
+ when(privilegedOperationExecutorMock.executePrivilegedOperation(
+ any(PrivilegedOperation.class), eq(true)))
+ .thenReturn(DEFAULT_TC_STATE_EXAMPLE);
+
+ trafficController
+ .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+ ArgumentCaptor<PrivilegedOperation> readOpCaptor = ArgumentCaptor.forClass
+ (PrivilegedOperation.class);
+
+ //NM_RECOVERY_ENABLED - so we expect three privileged operation executions
+ //1) read tc state 2) wipe tc state 3) init tc state
+ //one for wiping tc state - a second for initializing state
+ //First, verify read op
+ verify(privilegedOperationExecutorMock, times(1))
+ .executePrivilegedOperation(readOpCaptor.capture(), eq(true));
+ List<PrivilegedOperation> readOps = readOpCaptor.getAllValues();
+ verifyTrafficControlOperation(readOps.get(0),
+ PrivilegedOperation.OperationType.TC_READ_STATE,
+ Arrays.asList(READ_QDISC_CMD, READ_FILTER_CMD, READ_CLASS_CMD));
+
+ ArgumentCaptor<PrivilegedOperation> writeOpCaptor = ArgumentCaptor
+ .forClass(PrivilegedOperation.class);
+ verify(privilegedOperationExecutorMock, times(2))
+ .executePrivilegedOperation(writeOpCaptor.capture(), eq(false));
+ //Now verify that the two write operations were correct
+ List<PrivilegedOperation> writeOps = writeOpCaptor.getAllValues();
+ verifyTrafficControlOperation(writeOps.get(0),
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(WIPE_STATE_CMD));
+
+ verifyTrafficControlOperation(writeOps.get(1),
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+ ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+ } catch (ResourceHandlerException | PrivilegedOperationException |
+ IOException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected exception: "
+ + e.getClass().getSimpleName());
+ }
+ }
+
+ @Test
+ public void testInvalidBuilder() {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+ TrafficController trafficController = new TrafficController(conf,
+ privilegedOperationExecutorMock);
+ try {
+ trafficController
+ .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+ try {
+ //Invalid op type for TC batch builder
+ TrafficController.BatchBuilder invalidBuilder = trafficController.
+ new BatchBuilder(
+ PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP);
+ Assert.fail("Invalid builder check failed!");
+ } catch (ResourceHandlerException e) {
+ //expected
+ }
+ } catch (ResourceHandlerException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected exception: "
+ + e.getClass().getSimpleName());
+ }
+ }
+
+ @Test
+ public void testClassIdFileContentParsing() {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+ TrafficController trafficController = new TrafficController(conf,
+ privilegedOperationExecutorMock);
+
+ //Verify that classid file contents are parsed correctly
+ //This call strips the QDISC prefix and returns the classid asociated with
+ //the container
+ int parsedClassId = trafficController.getClassIdFromFileContents
+ (TEST_CLASS_ID_DECIMAL_STR);
+
+ Assert.assertEquals(TEST_CLASS_ID, parsedClassId);
+ }
+
+ @Test
+ public void testContainerOperations() {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+ TrafficController trafficController = new TrafficController(conf,
+ privilegedOperationExecutorMock);
+ try {
+ trafficController
+ .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+ int classId = trafficController.getNextClassId();
+
+ Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID);
+ Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId),
+ trafficController.getStringForNetClsClassId(classId));
+
+ //Verify that the operation is setup correctly with strictMode = false
+ TrafficController.BatchBuilder builder = trafficController.
+ new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+ .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false);
+ PrivilegedOperation addClassOp = builder.commitBatchToTempFile();
+
+ String expectedAddClassCmd = String.format
+ (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, YARN_BANDWIDTH_MBIT);
+ verifyTrafficControlOperation(addClassOp,
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(expectedAddClassCmd));
+
+ //Verify that the operation is setup correctly with strictMode = true
+ TrafficController.BatchBuilder strictModeBuilder = trafficController.
+ new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+ .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true);
+ PrivilegedOperation addClassStrictModeOp = strictModeBuilder
+ .commitBatchToTempFile();
+
+ String expectedAddClassStrictModeCmd = String.format
+ (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId,
+ CONTAINER_BANDWIDTH_MBIT);
+ verifyTrafficControlOperation(addClassStrictModeOp,
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(expectedAddClassStrictModeCmd));
+
+ TrafficController.BatchBuilder deleteBuilder = trafficController.new
+ BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+ .deleteContainerClass(classId);
+ PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile();
+
+ String expectedDeleteClassCmd = String.format
+ (FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE, classId);
+ verifyTrafficControlOperation(deleteClassOp,
+ PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+ Arrays.asList(expectedDeleteClassCmd));
+ } catch (ResourceHandlerException | IOException e) {
+ LOG.error("Unexpected exception: " + e);
+ Assert.fail("Caught unexpected exception: "
+ + e.getClass().getSimpleName());
+ }
+ }
+
+ @After
+ public void teardown() {
+ FileUtil.fullyDelete(new File(tmpPath));
+ }
+}