You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/28 22:27:41 UTC
[14/24] hadoop git commit: Revert "YARN-6675. Add NM support to
launch opportunistic containers based on overallocation. Contributed by Haibo
Chen."
Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."
This reverts commit c1362b68af03c546f4c0802758e9729d2372ab6c.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f8851333
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f8851333
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f8851333
Branch: refs/heads/YARN-1011
Commit: f8851333e8bf5e7d25b12839c5493c48de4046f8
Parents: 879a5e7
Author: Haibo Chen <ha...@apache.org>
Authored: Tue May 8 13:50:06 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 28 14:14:58 2018 -0700
----------------------------------------------------------------------
.../nodemanager/NodeStatusUpdaterImpl.java | 2 +-
.../containermanager/ContainerManagerImpl.java | 8 +-
.../launcher/ContainerLaunch.java | 2 +-
.../launcher/ContainersLauncher.java | 9 +-
.../monitor/ContainersMonitor.java | 38 +-
.../monitor/ContainersMonitorImpl.java | 56 +-
.../AllocationBasedResourceTracker.java | 114 --
...locationBasedResourceUtilizationTracker.java | 158 +++
.../scheduler/ContainerScheduler.java | 317 ++---
.../scheduler/ContainerSchedulerEventType.java | 4 +-
.../scheduler/NMAllocationPolicy.java | 63 -
.../scheduler/ResourceUtilizationTracker.java | 17 +-
.../SnapshotBasedOverAllocationPolicy.java | 54 -
.../UtilizationBasedResourceTracker.java | 95 --
.../BaseContainerManagerTest.java | 35 -
.../TestContainersMonitorResourceChange.java | 9 +-
.../TestAllocationBasedResourceTracker.java | 82 --
...locationBasedResourceUtilizationTracker.java | 93 ++
.../TestContainerSchedulerRecovery.java | 58 +-
...estContainerSchedulerWithOverAllocation.java | 1121 ------------------
20 files changed, 419 insertions(+), 1916 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index d757376..572684e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -543,7 +543,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private ResourceUtilization getContainersUtilization() {
ContainersMonitor containersMonitor =
this.context.getContainerManager().getContainersMonitor();
- return containersMonitor.getContainersUtilization(false).getUtilization();
+ return containersMonitor.getContainersUtilization();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index a08e227..27a7c80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -245,12 +245,6 @@ public class ContainerManagerImpl extends CompositeService implements
metrics);
addService(rsrcLocalizationSrvc);
- this.containersMonitor = createContainersMonitor(exec);
- addService(this.containersMonitor);
-
- // ContainersLauncher must be added after ContainersMonitor
- // because the former depends on the latter to initialize
- // over-allocation first.
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
@@ -275,6 +269,8 @@ public class ContainerManagerImpl extends CompositeService implements
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
}
+ this.containersMonitor = createContainersMonitor(exec);
+ addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 0228332..6347d4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1079,7 +1079,7 @@ public class ContainerLaunch implements Callable<Integer> {
* @return Process ID
* @throws Exception
*/
- protected String getContainerPid(Path pidFilePath) throws Exception {
+ private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr =
container.getContainerId().toString();
String processId = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 2f5acfa..7870f86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -121,7 +121,8 @@ public class ContainersLauncher extends AbstractService
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
- createContainerLaunch(app, event.getContainer());
+ new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
+ event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
@@ -224,10 +225,4 @@ public class ContainersLauncher extends AbstractService
break;
}
}
-
- protected ContainerLaunch createContainerLaunch(
- Application app, Container container) {
- return new ContainerLaunch(context, getConfig(), dispatcher,
- exec, app, container, dirsHandler, containerManager);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 8da4ec4..64831e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -23,24 +23,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
-
- /**
- * Get the aggregate resource utilization of containers running on the node,
- * with a timestamp of the measurement.
- * @param latest true if the latest result should be returned
- * @return ResourceUtilization resource utilization of all containers
- */
- ContainersResourceUtilization getContainersUtilization(boolean latest);
-
- /**
- * Get the policy to over-allocate containers when over-allocation is on.
- * @return null if over-allocation is turned off
- */
- NMAllocationPolicy getContainerOverAllocationPolicy();
+ ResourceUtilization getContainersUtilization();
float getVmemRatio();
@@ -80,26 +66,4 @@ public interface ContainersMonitor extends Service,
* containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
}
-
- /**
- * A snapshot of resource utilization of all containers with the timestamp.
- */
- final class ContainersResourceUtilization {
- private final ResourceUtilization utilization;
- private final long timestamp;
-
- public ContainersResourceUtilization(
- ResourceUtilization utilization, long timestamp) {
- this.utilization = utilization;
- this.timestamp = timestamp;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public ResourceUtilization getUtilization() {
- return utilization;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index a045d78..7873882 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -25,10 +25,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,9 +112,8 @@ public class ContainersMonitorImpl extends AbstractService implements
CPU, MEMORY
}
- private ContainersResourceUtilization latestContainersUtilization;
+ private ResourceUtilization containersUtilization;
- private NMAllocationPolicy overAllocationPolicy;
private ResourceThresholds overAllocationPreemptionThresholds;
private int overAlloctionPreemptionCpuCount = -1;
@@ -134,8 +129,7 @@ public class ContainersMonitorImpl extends AbstractService implements
this.monitoringThread = new MonitoringThread();
- this.latestContainersUtilization = new ContainersResourceUtilization(
- ResourceUtilization.newInstance(-1, -1, -1.0f), -1L);
+ this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
}
@Override
@@ -371,10 +365,6 @@ public class ContainersMonitorImpl extends AbstractService implements
this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance(
cpuPreemptionThreshold, memoryPreemptionThreshold);
- // TODO make this configurable
- this.overAllocationPolicy =
- createOverAllocationPolicy(resourceThresholds);
-
LOG.info("NodeManager oversubscription enabled with overallocation " +
"thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -382,11 +372,6 @@ public class ContainersMonitorImpl extends AbstractService implements
cpuPreemptionThreshold + ")");
}
- protected NMAllocationPolicy createOverAllocationPolicy(
- ResourceThresholds resourceThresholds) {
- return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
- }
-
private boolean isResourceCalculatorAvailable() {
if (resourceCalculatorPlugin == null) {
LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -670,12 +655,7 @@ public class ContainersMonitorImpl extends AbstractService implements
}
// Save the aggregated utilization of the containers
- setLatestContainersUtilization(trackedContainersUtilization);
-
- // check opportunity to start containers if over-allocation is on
- if (context.isOverAllocationEnabled()) {
- attemptToStartContainersUponLowUtilization();
- }
+ setContainersUtilization(trackedContainersUtilization);
// Publish the container utilization metrics to node manager
// metrics system.
@@ -1065,34 +1045,12 @@ public class ContainersMonitorImpl extends AbstractService implements
}
@Override
- public ContainersResourceUtilization getContainersUtilization(
- boolean latest) {
- // TODO update containerUtilization if latest is true
- return this.latestContainersUtilization;
- }
-
- @Override
- public NMAllocationPolicy getContainerOverAllocationPolicy() {
- return overAllocationPolicy;
- }
-
- private void setLatestContainersUtilization(ResourceUtilization utilization) {
- this.latestContainersUtilization = new ContainersResourceUtilization(
- utilization, System.currentTimeMillis());
+ public ResourceUtilization getContainersUtilization() {
+ return this.containersUtilization;
}
- @VisibleForTesting
- public void attemptToStartContainersUponLowUtilization() {
- if (getContainerOverAllocationPolicy() != null) {
- Resource available = getContainerOverAllocationPolicy()
- .getAvailableResources();
- if (available.getMemorySize() > 0 &&
- available.getVirtualCores() > 0) {
- eventDispatcher.getEventHandler().handle(
- new ContainerSchedulerEvent(null,
- ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
- }
- }
+ private void setContainersUtilization(ResourceUtilization utilization) {
+ this.containersUtilization = utilization;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
deleted file mode 100644
index 86b3698..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the resource utilization tracker that equates
- * resource utilization with the total resource allocated to the container.
- */
-public class AllocationBasedResourceTracker
- implements ResourceUtilizationTracker {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
- private static final Resource UNAVAILABLE =
- Resource.newInstance(0, 0);
-
- private ResourceUtilization containersAllocation;
- private ContainerScheduler scheduler;
-
-
- AllocationBasedResourceTracker(ContainerScheduler scheduler) {
- this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
- this.scheduler = scheduler;
- }
-
- /**
- * Get the accumulation of totally allocated resources to containers.
- * @return ResourceUtilization Resource Utilization.
- */
- @Override
- public ResourceUtilization getCurrentUtilization() {
- return this.containersAllocation;
- }
-
- /**
- * Get the amount of resources that have not been allocated to containers.
- * @return Resource resources that have not been allocated to containers.
- */
- protected Resource getUnallocatedResources() {
- // unallocated resources = node capacity - containers allocation
- // = -(container allocation - node capacity)
- ResourceUtilization allocationClone =
- ResourceUtilization.newInstance(containersAllocation);
- getContainersMonitor()
- .subtractNodeResourcesFromResourceUtilization(allocationClone);
-
- Resource unallocated = UNAVAILABLE;
- if (allocationClone.getCPU() <= 0 &&
- allocationClone.getPhysicalMemory() <= 0 &&
- allocationClone.getVirtualMemory() <= 0) {
- int cpu = Math.round(allocationClone.getCPU() *
- getContainersMonitor().getVCoresAllocatedForContainers());
- long memory = allocationClone.getPhysicalMemory();
- unallocated = Resource.newInstance(-memory, -cpu);
- }
- return unallocated;
- }
-
-
- @Override
- public Resource getAvailableResources() {
- return getUnallocatedResources();
- }
-
- /**
- * Add Container's resources to the accumulated allocation.
- * @param container Container.
- */
- @Override
- public void containerLaunched(Container container) {
- ContainersMonitor.increaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- /**
- * Subtract Container's resources to the accumulated allocation.
- * @param container Container.
- */
- @Override
- public void containerReleased(Container container) {
- ContainersMonitor.decreaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- public ContainersMonitor getContainersMonitor() {
- return this.scheduler.getContainersMonitor();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..6e2b617
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link ResourceUtilizationTracker} that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceUtilizationTracker implements
+ ResourceUtilizationTracker {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
+
+ private ResourceUtilization containersAllocation;
+ private ContainerScheduler scheduler;
+
+ AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
+ this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Get the accumulation of totally allocated resources to a container.
+ * @return ResourceUtilization Resource Utilization.
+ */
+ @Override
+ public ResourceUtilization getCurrentUtilization() {
+ return this.containersAllocation;
+ }
+
+ /**
+ * Add Container's resources to the accumulated Utilization.
+ * @param container Container.
+ */
+ @Override
+ public void addContainerResources(Container container) {
+ ContainersMonitor.increaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ /**
+ * Subtract Container's resources to the accumulated Utilization.
+ * @param container Container.
+ */
+ @Override
+ public void subtractContainerResource(Container container) {
+ ContainersMonitor.decreaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ /**
+ * Check if NM has resources available currently to run the container.
+ * @param container Container.
+ * @return True, if NM has resources available currently to run the container.
+ */
+ @Override
+ public boolean hasResourcesAvailable(Container container) {
+ long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
+ return hasResourcesAvailable(pMemBytes,
+ (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
+ container.getResource().getVirtualCores());
+ }
+
+ private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
+ int cpuVcores) {
+ // Check physical memory.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
+ this.containersAllocation.getPhysicalMemory(),
+ (pMemBytes >> 20),
+ (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
+ }
+ if (this.containersAllocation.getPhysicalMemory() +
+ (int) (pMemBytes >> 20) >
+ (int) (getContainersMonitor()
+ .getPmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("before vMemCheck" +
+ "[isEnabled={}, current={} + asked={} > allowed={}]",
+ getContainersMonitor().isVmemCheckEnabled(),
+ this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
+ (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
+ }
+ // Check virtual memory.
+ if (getContainersMonitor().isVmemCheckEnabled() &&
+ this.containersAllocation.getVirtualMemory() +
+ (int) (vMemBytes >> 20) >
+ (int) (getContainersMonitor()
+ .getVmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("before cpuCheck [asked={} > allowed={}]",
+ this.containersAllocation.getCPU(),
+ getContainersMonitor().getVCoresAllocatedForContainers());
+ }
+ // Check CPU. Compare using integral values of cores to avoid decimal
+ // inaccuracies.
+ if (!hasEnoughCpu(this.containersAllocation.getCPU(),
+ getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns whether there is enough space for coresRequested in totalCores.
+ * Converts currentAllocation usage to nearest integer count before comparing,
+ * as floats are inherently imprecise. NOTE: this calculation assumes that
+ * requested core counts must be integers, and currentAllocation core count
+ * must also be an integer.
+ *
+ * @param currentAllocation The current allocation, a float value from 0 to 1.
+ * @param totalCores The total cores in the system.
+ * @param coresRequested The number of cores requested.
+ * @return True if currentAllocationtotalCores*coresRequested <=
+ * totalCores.
+ */
+ public boolean hasEnoughCpu(float currentAllocation, long totalCores,
+ int coresRequested) {
+ // Must not cast here, as it would truncate the decimal digits.
+ return Math.round(currentAllocation * totalCores)
+ + coresRequested <= totalCores;
+ }
+
+ public ContainersMonitor getContainersMonitor() {
+ return this.scheduler.getContainersMonitor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 0bebe44..a61b9d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -49,7 +48,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,10 +80,6 @@ public class ContainerScheduler extends AbstractService implements
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedGuaranteedContainers = new LinkedHashMap<>();
- // sum of the resources requested by guaranteed containers in queue
- private final Resource guaranteedResourcesDemanded =
- Resource.newInstance(0, 0);
-
// Queue of Opportunistic Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedOpportunisticContainers = new LinkedHashMap<>();
@@ -94,10 +88,6 @@ public class ContainerScheduler extends AbstractService implements
// or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
- // sum of the resources to be released by opportunistic containers that
- // have been marked to be killed or paused.
- private final Resource opportunisticResourcesToBeReleased =
- Resource.newInstance(0, 0);
// Containers launched by the Scheduler will take a while to actually
// move to the RUNNING state, but should still be fair game for killing
@@ -138,17 +128,6 @@ public class ContainerScheduler extends AbstractService implements
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
- @VisibleForTesting
- public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
- NodeManagerMetrics metrics, int qLength) {
- super(ContainerScheduler.class.getName());
- this.context = context;
- this.dispatcher = dispatcher;
- this.metrics = metrics;
- this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
- this.opportunisticContainersStatus =
- OpportunisticContainersStatus.newInstance();
- }
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -176,16 +155,20 @@ public class ContainerScheduler extends AbstractService implements
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
- // We assume over allocation configurations have been initialized
- this.utilizationTracker = getResourceTracker();
}
- private AllocationBasedResourceTracker getResourceTracker() {
- if (context.isOverAllocationEnabled()) {
- return new UtilizationBasedResourceTracker(this);
- } else {
- return new AllocationBasedResourceTracker(this);
- }
+ @VisibleForTesting
+ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+ NodeManagerMetrics metrics, int qLength) {
+ super(ContainerScheduler.class.getName());
+ this.context = context;
+ this.dispatcher = dispatcher;
+ this.metrics = metrics;
+ this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+ this.utilizationTracker =
+ new AllocationBasedResourceUtilizationTracker(this);
+ this.opportunisticContainersStatus =
+ OpportunisticContainersStatus.newInstance();
}
/**
@@ -208,18 +191,14 @@ public class ContainerScheduler extends AbstractService implements
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
- LOG.error("Unknown event type on UpdateContainer: " + event.getType());
+ LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
- startPendingContainers(false);
- break;
- case SCHEDULE_CONTAINERS:
- startPendingContainers(true);
- break;
+ startPendingContainers(maxOppQueueLength <= 0);
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
@@ -234,10 +213,10 @@ public class ContainerScheduler extends AbstractService implements
ContainerId containerId = updateEvent.getContainer().getContainerId();
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
- this.utilizationTracker.containerReleased(
+ this.utilizationTracker.subtractContainerResource(
new ContainerImpl(getConfig(), null, null, null, null,
updateEvent.getOriginalToken(), context));
- this.utilizationTracker.containerLaunched(
+ this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
@@ -253,20 +232,17 @@ public class ContainerScheduler extends AbstractService implements
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
- Resources.addTo(guaranteedResourcesDemanded,
- updateEvent.getContainer().getResource());
- startPendingContainers(true);
+ //Kill/pause opportunistic containers if any to make room for
+ // promotion request
+ reclaimOpportunisticContainerResources(updateEvent.getContainer());
}
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
- Resources.subtractFrom(guaranteedResourcesDemanded,
- updateEvent.getContainer().getResource());
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
- startPendingContainers(false);
}
}
try {
@@ -293,7 +269,6 @@ public class ContainerScheduler extends AbstractService implements
|| rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
} else if (execType == ExecutionType.OPPORTUNISTIC) {
queuedOpportunisticContainers
.put(container.getContainerId(), container);
@@ -304,7 +279,7 @@ public class ContainerScheduler extends AbstractService implements
}
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container);
- utilizationTracker.containerLaunched(container);
+ utilizationTracker.addContainerResources(container);
}
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
&& rcs.getCapability() != null) {
@@ -369,107 +344,65 @@ public class ContainerScheduler extends AbstractService implements
}
private void onResourcesReclaimed(Container container) {
- ContainerId containerId = container.getContainerId();
-
- // This could be killed externally for eg. by the ContainerManager,
- // in which case, the container might still be queued.
- if (queuedOpportunisticContainers.remove(containerId) != null) {
- return;
- }
+ oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
// in which case, the container might still be queued.
- if (queuedGuaranteedContainers.remove(containerId) != null) {
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
- return;
- }
-
- if (oppContainersToKill.remove(containerId) != null) {
- Resources.subtractFrom(
- opportunisticResourcesToBeReleased, container.getResource());
+ Container queued =
+ queuedOpportunisticContainers.remove(container.getContainerId());
+ if (queued == null) {
+ queuedGuaranteedContainers.remove(container.getContainerId());
}
// Requeue PAUSED containers
if (container.getContainerState() == ContainerState.PAUSED) {
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
- queuedGuaranteedContainers.put(containerId, container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
+ queuedGuaranteedContainers.put(container.getContainerId(), container);
} else {
- queuedOpportunisticContainers.put(containerId, container);
+ queuedOpportunisticContainers.put(
+ container.getContainerId(), container);
}
}
// decrement only if it was a running container
- Container completedContainer = runningContainers.remove(containerId);
+ Container completedContainer = runningContainers.remove(container
+ .getContainerId());
// only a running container releases resources upon completion
boolean resourceReleased = completedContainer != null;
if (resourceReleased) {
- this.utilizationTracker.containerReleased(container);
+ this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
-
- // In case of over-allocation being turned on, we may need to reclaim
- // more resources since the opportunistic containers that have been
- // killed or paused may have not released as much resource as we need.
- boolean reclaimOpportunisticResources = context.isOverAllocationEnabled();
- startPendingContainers(reclaimOpportunisticResources);
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
}
}
/**
* Start pending containers in the queue.
- * @param reclaimOpportunisticResources if set to true, resources allocated
- * to running OPPORTUNISTIC containers will be reclaimed in
- * cases where there are GUARANTEED containers being queued
+ * @param forceStartGuaranteedContaieners When this is true, start guaranteed
+ * container without looking at available resource
*/
- private void startPendingContainers(boolean reclaimOpportunisticResources) {
- // When opportunistic container not allowed (which is determined by
- // max-queue length of pending opportunistic containers <= 0), start
- // guaranteed containers without looking at available resources and
- // skip scanning the queue of opportunistic containers
- if (maxOppQueueLength <= 0) {
- forcefullyStartGuaranteedContainers();
- return;
- }
-
- Resource available = utilizationTracker.getAvailableResources();
-
- // Start guaranteed containers that are queued, if resources available.
- boolean allGuaranteedContainersLaunched =
- startGuaranteedContainers(available);
- // Start opportunistic containers, if resources available, which is true
- // if all guaranteed containers in queue have been launched.
- if (allGuaranteedContainersLaunched) {
- startOpportunisticContainers(available);
- } else {
- // If not all guaranteed containers in queue are launched, we may need
- // to reclaim resources from opportunistic containers that are running.
- if (reclaimOpportunisticResources) {
- reclaimOpportunisticContainerResources();
- }
+ private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
+ // Start guaranteed containers that are paused, if resources available.
+ boolean resourcesAvailable = startContainers(
+ queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+ // Start opportunistic containers, if resources available.
+ if (resourcesAvailable) {
+ startContainers(queuedOpportunisticContainers.values(), false);
}
}
- /**
- * Try to launch as many GUARANTEED containers as possible.
- * @param available the amount of resources available to launch containers
- * @return true if all queued GUARANTEED containers are launched
- * or there is no GUARANTEED containers to launch
- */
- private boolean startGuaranteedContainers(Resource available) {
- Iterator<Container> cIter =
- queuedGuaranteedContainers.values().iterator();
+ private boolean startContainers(
+ Collection<Container> containersToBeStarted, boolean force) {
+ Iterator<Container> cIter = containersToBeStarted.iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
- if (isResourceAvailable(available, container)) {
- startContainer(container);
- Resources.subtractFrom(available, container.getResource());
+ if (tryStartContainer(container, force)) {
cIter.remove();
- Resources.subtractFrom(
- guaranteedResourcesDemanded, container.getResource());
} else {
resourcesAvailable = false;
}
@@ -477,49 +410,25 @@ public class ContainerScheduler extends AbstractService implements
return resourcesAvailable;
}
- /**
- * Launch all queued GUARANTEED containers without checking resource
- * availability. This is an optimization in cases where OPPORTUNISTIC
- * containers are not allowed on the node.
- */
- private void forcefullyStartGuaranteedContainers() {
- Iterator<Container> cIter =
- queuedGuaranteedContainers.values().iterator();
- while (cIter.hasNext()) {
- Container container = cIter.next();
+ private boolean tryStartContainer(Container container, boolean force) {
+ boolean containerStarted = false;
+ // call startContainer without checking available resource when force==true
+ if (force || resourceAvailableToStartContainer(
+ container)) {
startContainer(container);
- cIter.remove();
- Resources.subtractFrom(
- guaranteedResourcesDemanded, container.getResource());
+ containerStarted = true;
}
+ return containerStarted;
}
+
/**
- * Try to launch as many OPPORTUNISTIC containers as possible.
- * @param available the amount of resources available to launch containers
- * @return true if all OPPORTUNISTIC containers are launched
- * or there is no OPPORTUNISTIC containers to launch
+ * Check if there is resource available to start a given container
+ * immediately. (This can be extended to include overallocated resources)
+ * @param container the container to start
+ * @return true if container can be launched directly
*/
- private boolean startOpportunisticContainers(Resource available) {
- Iterator<Container> cIter =
- queuedOpportunisticContainers.values().iterator();
- boolean resourcesAvailable = true;
- while (cIter.hasNext() && resourcesAvailable) {
- Container container = cIter.next();
- if (isResourceAvailable(available, container)) {
- startContainer(container);
- Resources.subtractFrom(available, container.getResource());
- cIter.remove();
- } else {
- resourcesAvailable = false;
- }
- }
- return resourcesAvailable;
- }
-
- private static boolean isResourceAvailable(
- Resource resource, Container container) {
- Resource left = Resources.subtract(resource, container.getResource());
- return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0;
+ private boolean resourceAvailableToStartContainer(Container container) {
+ return this.utilizationTracker.hasResourcesAvailable(container);
}
private boolean enqueueContainer(Container container) {
@@ -529,7 +438,6 @@ public class ContainerScheduler extends AbstractService implements
boolean isQueued;
if (isGuaranteedContainer) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
@@ -574,7 +482,18 @@ public class ContainerScheduler extends AbstractService implements
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
- startPendingContainers(true);
+
+ // When opportunistic container not allowed (which is determined by
+ // max-queue length of pending opportunistic containers <= 0), start
+ // guaranteed containers without looking at available resources.
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
+
+ // if the guaranteed container is queued, we need to preempt opportunistic
+ // containers for make room for it
+ if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
+ reclaimOpportunisticContainerResources(container);
+ }
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
@@ -592,19 +511,19 @@ public class ContainerScheduler extends AbstractService implements
}
@SuppressWarnings("unchecked")
- private void reclaimOpportunisticContainerResources() {
+ private void reclaimOpportunisticContainerResources(Container container) {
List<Container> extraOppContainersToReclaim =
- pickOpportunisticContainersToReclaimResources();
- killOpportunisticContainers(extraOppContainersToReclaim);
- }
-
- private void killOpportunisticContainers(
- Collection<Container> containersToReclaim) {
- for (Container contToReclaim : containersToReclaim) {
+ pickOpportunisticContainersToReclaimResources(
+ container.getContainerId());
+ // Kill the opportunistic containers that were chosen.
+ for (Container contToReclaim : extraOppContainersToReclaim) {
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
- "preempted";
- LOG.info("Container {} will be {} to start the execution of guaranteed" +
- " containers.", contToReclaim.getContainerId(), preemptionAction);
+ "resumed";
+ LOG.info(
+ "Container {} will be {} to start the "
+ + "execution of guaranteed container {}.",
+ contToReclaim.getContainerId(), preemptionAction,
+ container.getContainerId());
if (usePauseEventForPreemption) {
contToReclaim.sendPauseEvent(
@@ -615,8 +534,6 @@ public class ContainerScheduler extends AbstractService implements
"Container Killed to make room for Guaranteed Container.");
}
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
- Resources.addTo(
- opportunisticResourcesToBeReleased, contToReclaim.getResource());
}
}
@@ -625,7 +542,7 @@ public class ContainerScheduler extends AbstractService implements
// Skip to put into runningContainers and addUtilization when recover
if (!runningContainers.containsKey(container.getContainerId())) {
runningContainers.put(container.getContainerId(), container);
- this.utilizationTracker.containerLaunched(container);
+ this.utilizationTracker.addContainerResources(container);
}
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
@@ -634,12 +551,14 @@ public class ContainerScheduler extends AbstractService implements
container.sendLaunchEvent();
}
- private List<Container> pickOpportunisticContainersToReclaimResources() {
+ private List<Container> pickOpportunisticContainersToReclaimResources(
+ ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
List<Container> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
- ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+ containerToStartId);
// Go over the running opportunistic containers.
// Use a descending iterator to kill more recently started containers.
@@ -658,19 +577,15 @@ public class ContainerScheduler extends AbstractService implements
continue;
}
extraOpportContainersToKill.add(runningCont);
- // In the case of over-allocation, the running container may not
- // release as much resources as it has requested, but we'll check
- // again if more containers need to be killed/paused when this
- // container is released.
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), resourcesToFreeUp,
runningCont.getResource());
}
}
if (!hasSufficientResources(resourcesToFreeUp)) {
- LOG.warn("There are no sufficient resources to start guaranteed" +
- " containers at the moment. Opportunistic containers are in" +
- " the process of being killed to make room.");
+ LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
+ "at the moment. Opportunistic containers are in the process of" +
+ "being killed to make room.", containerToStartId);
}
return extraOpportContainersToKill;
}
@@ -685,42 +600,34 @@ public class ContainerScheduler extends AbstractService implements
* getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
}
- /**
- * Determine how much resources are needed to be freed up to launch the given
- * GUARANTEED container. Used to determine how many running OPPORTUNISTIC
- * containers need to be killed/paused, assuming OPPORTUNISTIC containers to
- * be killed/paused will release the amount of resources they have requested.
- *
- * If the node is over-allocating itself, this may cause not enough
- * OPPORTUNISTIC containers being killed/paused in cases where the running
- * OPPORTUNISTIC containers are not consuming fully their resource requests.
- * We'd check again upon container completion events to see if more running
- * OPPORTUNISTIC containers need to be killed/paused.
- *
- * @return the amount of resource needed to be reclaimed for this container
- */
- private ResourceUtilization resourcesToFreeUp() {
+ private ResourceUtilization resourcesToFreeUp(
+ ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
- .newInstance(0, 0, 0.0f);
-
- // Add to the allocation the allocation of pending guaranteed containers.
- ContainersMonitor.increaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, guaranteedResourcesDemanded);
+ .newInstance(this.utilizationTracker.getCurrentUtilization());
+
+ // Add to the allocation the allocation of the pending guaranteed
+ // containers that will start before the current container will be started.
+ for (Container container : queuedGuaranteedContainers.values()) {
+ ContainersMonitor.increaseResourceUtilization(
+ getContainersMonitor(), resourceAllocationToFreeUp,
+ container.getResource());
+ if (container.getContainerId().equals(containerToStartId)) {
+ break;
+ }
+ }
// These resources are being freed, likely at the behest of another
// guaranteed container..
- ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, opportunisticResourcesToBeReleased);
-
- // Deduct any remaining resources available
- Resource availableResources = utilizationTracker.getAvailableResources();
- if (availableResources.getVirtualCores() > 0 &&
- availableResources.getMemorySize() > 0) {
- ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, availableResources);
+ for (Container container : oppContainersToKill.values()) {
+ ContainersMonitor.decreaseResourceUtilization(
+ getContainersMonitor(), resourceAllocationToFreeUp,
+ container.getResource());
}
+ // Subtract the overall node resources.
+ getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+ resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 9ad4f91..294eddf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -28,7 +28,5 @@ public enum ContainerSchedulerEventType {
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
CONTAINER_PAUSED,
- RECOVERY_COMPLETED,
- // Producer: Containers Monitor when over-allocation is on
- SCHEDULE_CONTAINERS
+ RECOVERY_COMPLETED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
deleted file mode 100644
index 58b73d2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * Keeps track of containers utilization over time and determines how much
- * resources are available to launch containers when over-allocation is on.
- */
-public abstract class NMAllocationPolicy {
- protected final ResourceThresholds overAllocationThresholds;
- protected final ContainersMonitor containersMonitor;
-
- public NMAllocationPolicy(
- ResourceThresholds overAllocationThresholds,
- ContainersMonitor containersMonitor) {
- this.containersMonitor = containersMonitor;
- this.overAllocationThresholds = overAllocationThresholds;
- }
-
- /**
- * Handle container launch events.
- * @param container the container that has been launched
- */
- public void containerLaunched(Container container) {
-
- }
-
- /**
- * Handle container release events.
- * @param container the container that has been released
- */
- public void containerReleased(Container container) {
-
- }
-
- /**
- * Get the amount of resources to launch containers when
- * over-allocation is turned on.
- * @return the amount of resources available to launch containers
- */
- public abstract Resource getAvailableResources();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 98d99c6..3c17eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -39,20 +38,22 @@ public interface ResourceUtilizationTracker {
ResourceUtilization getCurrentUtilization();
/**
- * Get the amount of resources currently available to launch containers.
- * @return Resource resources available to launch containers
+ * Add Container's resources to Node Utilization.
+ * @param container Container.
*/
- Resource getAvailableResources();
+ void addContainerResources(Container container);
/**
- * Add Container's resources to Node Utilization upon container launch.
+ * Subtract Container's resources to Node Utilization.
* @param container Container.
*/
- void containerLaunched(Container container);
+ void subtractContainerResource(Container container);
/**
- * Subtract Container's resources to Node Utilization upon container release.
+ * Check if NM has resources available currently to run the container.
* @param container Container.
+ * @return True, if NM has resources available currently to run the container.
*/
- void containerReleased(Container container);
+ boolean hasResourcesAvailable(Container container);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
deleted file mode 100644
index f486506..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * An implementation of NMAllocationPolicy based on the
- * snapshot of the latest containers utilization to determine how much
- * resources are available * to launch containers when over-allocation
- * is turned on.
- */
-public class SnapshotBasedOverAllocationPolicy
- extends NMAllocationPolicy {
-
- public SnapshotBasedOverAllocationPolicy(
- ResourceThresholds overAllocationThresholds,
- ContainersMonitor containersMonitor) {
- super(overAllocationThresholds, containersMonitor);
- }
-
- @Override
- public Resource getAvailableResources() {
- ResourceUtilization utilization =
- containersMonitor.getContainersUtilization(true).getUtilization();
- long memoryAvailable = Math.round(
- overAllocationThresholds.getMemoryThreshold() *
- containersMonitor.getPmemAllocatedForContainers()) -
- (utilization.getPhysicalMemory() << 20);
- int vcoreAvailable = Math.round(
- (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) *
- containersMonitor.getVCoresAllocatedForContainers());
- return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
deleted file mode 100644
index 6f9bc82..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-* An resource availability tracker that determines if there are resources
-* available based on if there are unallocated resources or if there are
-* un-utilized resources.
-*/
-public class UtilizationBasedResourceTracker
- extends AllocationBasedResourceTracker {
- private static final Logger LOG =
- LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
- private final NMAllocationPolicy overAllocationPolicy;
-
- UtilizationBasedResourceTracker(ContainerScheduler scheduler) {
- super(scheduler);
- this.overAllocationPolicy =
- getContainersMonitor().getContainerOverAllocationPolicy();
- }
-
- @Override
- public void containerLaunched(Container container) {
- super.containerLaunched(container);
- if (overAllocationPolicy != null) {
- overAllocationPolicy.containerLaunched(container);
- }
- }
-
- @Override
- public void containerReleased(Container container) {
- super.containerReleased(container);
- if (overAllocationPolicy != null) {
- overAllocationPolicy.containerReleased(container);
- }
- }
-
- @Override
- public Resource getAvailableResources() {
- Resource resourceBasedOnAllocation = getUnallocatedResources();
- Resource resourceBasedOnUtilization =
- getResourcesAvailableBasedOnUtilization();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The amount of resources available based on allocation is " +
- resourceBasedOnAllocation + ", based on utilization is " +
- resourceBasedOnUtilization);
- }
-
- return Resources.componentwiseMax(resourceBasedOnAllocation,
- resourceBasedOnUtilization);
- }
-
- /**
- * Get the amount of resources based on the slack between
- * the actual utilization and desired utilization.
- * @return Resource resource available
- */
- private Resource getResourcesAvailableBasedOnUtilization() {
- if (overAllocationPolicy == null) {
- return Resources.none();
- }
-
- return overAllocationPolicy.getAvailableResources();
- }
-
- @Override
- public ResourceUtilization getCurrentUtilization() {
- return getContainersMonitor().getContainersUtilization(false)
- .getUtilization();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 92613ed..493aa4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
-import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -346,40 +345,6 @@ public abstract class BaseContainerManagerTest {
fStates.contains(containerStatus.getState()));
}
- public static void waitForContainerSubState(
- ContainerManagementProtocol containerManager, ContainerId containerID,
- ContainerSubState finalState)
- throws InterruptedException, YarnException, IOException {
- waitForContainerSubState(containerManager, containerID,
- Arrays.asList(finalState), 20);
- }
- public static void waitForContainerSubState(
- ContainerManagementProtocol containerManager, ContainerId containerID,
- List<ContainerSubState> finalStates, int timeOutMax)
- throws InterruptedException, YarnException, IOException {
- List<ContainerId> list = new ArrayList<>();
- list.add(containerID);
- GetContainerStatusesRequest request =
- GetContainerStatusesRequest.newInstance(list);
- ContainerStatus containerStatus;
- HashSet<ContainerSubState> fStates = new HashSet<>(finalStates);
- int timeoutSecs = 0;
- do {
- Thread.sleep(1000);
- containerStatus =
- containerManager.getContainerStatuses(request)
- .getContainerStatuses().get(0);
- LOG.info("Waiting for container to get into one of states " + fStates
- + ". Current state is " + containerStatus.getContainerSubState());
- timeoutSecs += 1;
- } while (!fStates.contains(containerStatus.getContainerSubState())
- && timeoutSecs < timeOutMax);
- LOG.info("Container state is " + containerStatus.getContainerSubState());
- Assert.assertTrue("ContainerSubState is not correct (timedout)",
- fStates.contains(containerStatus.getContainerSubState()));
- }
-
-
public static void waitForApplicationState(
ContainerManagerImpl containerManager, ApplicationId appID,
ApplicationState finalState)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index c071283..8aee532 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -288,7 +288,7 @@ public class TestContainersMonitorResourceChange {
// will be 0.
assertEquals(
"Resource utilization must be default with MonitorThread's first run",
- 0, containersMonitor.getContainersUtilization(false).getUtilization()
+ 0, containersMonitor.getContainersUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
// Verify the container utilization value. Since atleast one round is done,
@@ -303,9 +303,8 @@ public class TestContainersMonitorResourceChange {
ContainersMonitorImpl containersMonitor, int timeoutMsecs)
throws InterruptedException {
int timeWaiting = 0;
- while (0 == containersMonitor.getContainersUtilization(false)
- .getUtilization().compareTo(
- ResourceUtilization.newInstance(0, 0, 0.0f))) {
+ while (0 == containersMonitor.getContainersUtilization()
+ .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) {
if (timeWaiting >= timeoutMsecs) {
break;
}
@@ -317,7 +316,7 @@ public class TestContainersMonitorResourceChange {
}
assertTrue("Resource utilization is not changed from second run onwards",
- 0 != containersMonitor.getContainersUtilization(false).getUtilization()
+ 0 != containersMonitor.getContainersUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
deleted file mode 100644
index 1e8bfdf..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the {@link AllocationBasedResourceTracker} class.
- */
-public class TestAllocationBasedResourceTracker {
-
- private ContainerScheduler mockContainerScheduler;
-
- @Before
- public void setup() {
- mockContainerScheduler = mock(ContainerScheduler.class);
- ContainersMonitor containersMonitor =
- new ContainersMonitorImpl(mock(ContainerExecutor.class),
- mock(AsyncDispatcher.class), mock(Context.class));
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
- conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
- conf.setInt(YarnConfiguration.NM_VCORES, 8);
- containersMonitor.init(conf);
- when(mockContainerScheduler.getContainersMonitor())
- .thenReturn(containersMonitor);
- }
-
- /**
- * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
- * hasResourceAvailable should return false.
- */
- @Test
- public void testHasResourcesAvailable() {
- AllocationBasedResourceTracker tracker =
- new AllocationBasedResourceTracker(mockContainerScheduler);
- Container testContainer = mock(Container.class);
- when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(
- isResourcesAvailable(tracker.getAvailableResources(), testContainer));
- tracker.containerLaunched(testContainer);
- }
- Assert.assertFalse(
- isResourcesAvailable(tracker.getAvailableResources(), testContainer));
- }
-
- private static boolean isResourcesAvailable(
- Resource available, Container container) {
- return available.compareTo(container.getResource()) >= 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8851333/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..82c2147
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link AllocationBasedResourceUtilizationTracker} class.
+ */
+public class TestAllocationBasedResourceUtilizationTracker {
+
+ private ContainerScheduler mockContainerScheduler;
+
+ @Before
+ public void setup() {
+ mockContainerScheduler = mock(ContainerScheduler.class);
+ ContainersMonitor containersMonitor =
+ new ContainersMonitorImpl(mock(ContainerExecutor.class),
+ mock(AsyncDispatcher.class), mock(Context.class));
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+ conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
+ conf.setInt(YarnConfiguration.NM_VCORES, 8);
+ containersMonitor.init(conf);
+ when(mockContainerScheduler.getContainersMonitor())
+ .thenReturn(containersMonitor);
+ }
+
+ /**
+ * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
+ * hasResourceAvailable should return false.
+ */
+ @Test
+ public void testHasResourcesAvailable() {
+ AllocationBasedResourceUtilizationTracker tracker =
+ new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+ Container testContainer = mock(Container.class);
+ when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
+ tracker.addContainerResources(testContainer);
+ }
+ Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
+ }
+
+ /**
+ * Test the case where the current allocation has been truncated to 0.8888891
+ * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
+ * true.
+ */
+ @Test
+ public void testHasEnoughCpu() {
+ AllocationBasedResourceUtilizationTracker tracker =
+ new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+ float currentAllocation = 0.8888891f;
+ long totalCores = 9;
+ int alreadyUsedCores = 8;
+ Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
+ (int) totalCores - alreadyUsedCores));
+ Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
+ (int) totalCores - alreadyUsedCores + 1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org