You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/28 22:27:43 UTC
[16/24] hadoop git commit: YARN-6675. Add NM support to launch
opportunistic containers based on overallocation. Contributed by Haibo Chen.
YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/879a5e7b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/879a5e7b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/879a5e7b
Branch: refs/heads/YARN-1011
Commit: 879a5e7b3a89e34305c699e52a8e5da288505c6c
Parents: 0456591
Author: Miklos Szegedi <sz...@apache.org>
Authored: Fri Apr 20 14:15:29 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 28 14:14:58 2018 -0700
----------------------------------------------------------------------
.../nodemanager/NodeStatusUpdaterImpl.java | 2 +-
.../containermanager/ContainerManagerImpl.java | 8 +-
.../launcher/ContainerLaunch.java | 2 +-
.../launcher/ContainersLauncher.java | 9 +-
.../monitor/ContainersMonitor.java | 38 +-
.../monitor/ContainersMonitorImpl.java | 56 +-
.../AllocationBasedResourceTracker.java | 114 ++
...locationBasedResourceUtilizationTracker.java | 158 ---
.../scheduler/ContainerScheduler.java | 317 +++--
.../scheduler/ContainerSchedulerEventType.java | 4 +-
.../scheduler/NMAllocationPolicy.java | 63 +
.../scheduler/ResourceUtilizationTracker.java | 17 +-
.../SnapshotBasedOverAllocationPolicy.java | 54 +
.../UtilizationBasedResourceTracker.java | 95 ++
.../BaseContainerManagerTest.java | 35 +
.../TestContainersMonitorResourceChange.java | 9 +-
.../TestAllocationBasedResourceTracker.java | 82 ++
...locationBasedResourceUtilizationTracker.java | 93 --
.../TestContainerSchedulerRecovery.java | 58 +-
...estContainerSchedulerWithOverAllocation.java | 1121 ++++++++++++++++++
20 files changed, 1916 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 572684e..d757376 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -543,7 +543,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private ResourceUtilization getContainersUtilization() {
ContainersMonitor containersMonitor =
this.context.getContainerManager().getContainersMonitor();
- return containersMonitor.getContainersUtilization();
+ return containersMonitor.getContainersUtilization(false).getUtilization();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 27a7c80..a08e227 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -245,6 +245,12 @@ public class ContainerManagerImpl extends CompositeService implements
metrics);
addService(rsrcLocalizationSrvc);
+ this.containersMonitor = createContainersMonitor(exec);
+ addService(this.containersMonitor);
+
+ // ContainersLauncher must be added after ContainersMonitor
+ // because the former depends on the latter to initialize
+ // over-allocation first.
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
@@ -269,8 +275,6 @@ public class ContainerManagerImpl extends CompositeService implements
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
}
- this.containersMonitor = createContainersMonitor(exec);
- addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 6347d4e..0228332 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1079,7 +1079,7 @@ public class ContainerLaunch implements Callable<Integer> {
* @return Process ID
* @throws Exception
*/
- private String getContainerPid(Path pidFilePath) throws Exception {
+ protected String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr =
container.getContainerId().toString();
String processId = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 7870f86..2f5acfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -121,8 +121,7 @@ public class ContainersLauncher extends AbstractService
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
- new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
- event.getContainer(), dirsHandler, containerManager);
+ createContainerLaunch(app, event.getContainer());
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
@@ -225,4 +224,10 @@ public class ContainersLauncher extends AbstractService
break;
}
}
+
+ protected ContainerLaunch createContainerLaunch(
+ Application app, Container container) {
+ return new ContainerLaunch(context, getConfig(), dispatcher,
+ exec, app, container, dirsHandler, containerManager);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 64831e9..8da4ec4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -23,10 +23,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
- ResourceUtilization getContainersUtilization();
+
+ /**
+ * Get the aggregate resource utilization of containers running on the node,
+ * with a timestamp of the measurement.
+ * @param latest true if the latest result should be returned
+ * @return ResourceUtilization resource utilization of all containers
+ */
+ ContainersResourceUtilization getContainersUtilization(boolean latest);
+
+ /**
+ * Get the policy to over-allocate containers when over-allocation is on.
+ * @return null if over-allocation is turned off
+ */
+ NMAllocationPolicy getContainerOverAllocationPolicy();
float getVmemRatio();
@@ -66,4 +80,26 @@ public interface ContainersMonitor extends Service,
* containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
}
+
+ /**
+ * A snapshot of resource utilization of all containers with the timestamp.
+ */
+ final class ContainersResourceUtilization {
+ private final ResourceUtilization utilization;
+ private final long timestamp;
+
+ public ContainersResourceUtilization(
+ ResourceUtilization utilization, long timestamp) {
+ this.utilization = utilization;
+ this.timestamp = timestamp;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public ResourceUtilization getUtilization() {
+ return utilization;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 7873882..a045d78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,8 +116,9 @@ public class ContainersMonitorImpl extends AbstractService implements
CPU, MEMORY
}
- private ResourceUtilization containersUtilization;
+ private ContainersResourceUtilization latestContainersUtilization;
+ private NMAllocationPolicy overAllocationPolicy;
private ResourceThresholds overAllocationPreemptionThresholds;
private int overAlloctionPreemptionCpuCount = -1;
@@ -129,7 +134,8 @@ public class ContainersMonitorImpl extends AbstractService implements
this.monitoringThread = new MonitoringThread();
- this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.latestContainersUtilization = new ContainersResourceUtilization(
+ ResourceUtilization.newInstance(-1, -1, -1.0f), -1L);
}
@Override
@@ -365,6 +371,10 @@ public class ContainersMonitorImpl extends AbstractService implements
this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance(
cpuPreemptionThreshold, memoryPreemptionThreshold);
+ // TODO make this configurable
+ this.overAllocationPolicy =
+ createOverAllocationPolicy(resourceThresholds);
+
LOG.info("NodeManager oversubscription enabled with overallocation " +
"thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -372,6 +382,11 @@ public class ContainersMonitorImpl extends AbstractService implements
cpuPreemptionThreshold + ")");
}
+ protected NMAllocationPolicy createOverAllocationPolicy(
+ ResourceThresholds resourceThresholds) {
+ return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
+ }
+
private boolean isResourceCalculatorAvailable() {
if (resourceCalculatorPlugin == null) {
LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -655,7 +670,12 @@ public class ContainersMonitorImpl extends AbstractService implements
}
// Save the aggregated utilization of the containers
- setContainersUtilization(trackedContainersUtilization);
+ setLatestContainersUtilization(trackedContainersUtilization);
+
+ // check opportunity to start containers if over-allocation is on
+ if (context.isOverAllocationEnabled()) {
+ attemptToStartContainersUponLowUtilization();
+ }
// Publish the container utilization metrics to node manager
// metrics system.
@@ -1045,12 +1065,34 @@ public class ContainersMonitorImpl extends AbstractService implements
}
@Override
- public ResourceUtilization getContainersUtilization() {
- return this.containersUtilization;
+ public ContainersResourceUtilization getContainersUtilization(
+ boolean latest) {
+ // TODO update containerUtilization if latest is true
+ return this.latestContainersUtilization;
+ }
+
+ @Override
+ public NMAllocationPolicy getContainerOverAllocationPolicy() {
+ return overAllocationPolicy;
+ }
+
+ private void setLatestContainersUtilization(ResourceUtilization utilization) {
+ this.latestContainersUtilization = new ContainersResourceUtilization(
+ utilization, System.currentTimeMillis());
}
- private void setContainersUtilization(ResourceUtilization utilization) {
- this.containersUtilization = utilization;
+ @VisibleForTesting
+ public void attemptToStartContainersUponLowUtilization() {
+ if (getContainerOverAllocationPolicy() != null) {
+ Resource available = getContainerOverAllocationPolicy()
+ .getAvailableResources();
+ if (available.getMemorySize() > 0 &&
+ available.getVirtualCores() > 0) {
+ eventDispatcher.getEventHandler().handle(
+ new ContainerSchedulerEvent(null,
+ ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
new file mode 100644
index 0000000..86b3698
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the resource utilization tracker that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceTracker
+ implements ResourceUtilizationTracker {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
+
+ private static final Resource UNAVAILABLE =
+ Resource.newInstance(0, 0);
+
+ private ResourceUtilization containersAllocation;
+ private ContainerScheduler scheduler;
+
+
+ AllocationBasedResourceTracker(ContainerScheduler scheduler) {
+ this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Get the accumulation of totally allocated resources to containers.
+ * @return ResourceUtilization Resource Utilization.
+ */
+ @Override
+ public ResourceUtilization getCurrentUtilization() {
+ return this.containersAllocation;
+ }
+
+ /**
+ * Get the amount of resources that have not been allocated to containers.
+ * @return Resource resources that have not been allocated to containers.
+ */
+ protected Resource getUnallocatedResources() {
+ // unallocated resources = node capacity - containers allocation
+ // = -(container allocation - node capacity)
+ ResourceUtilization allocationClone =
+ ResourceUtilization.newInstance(containersAllocation);
+ getContainersMonitor()
+ .subtractNodeResourcesFromResourceUtilization(allocationClone);
+
+ Resource unallocated = UNAVAILABLE;
+ if (allocationClone.getCPU() <= 0 &&
+ allocationClone.getPhysicalMemory() <= 0 &&
+ allocationClone.getVirtualMemory() <= 0) {
+ int cpu = Math.round(allocationClone.getCPU() *
+ getContainersMonitor().getVCoresAllocatedForContainers());
+ long memory = allocationClone.getPhysicalMemory();
+ unallocated = Resource.newInstance(-memory, -cpu);
+ }
+ return unallocated;
+ }
+
+
+ @Override
+ public Resource getAvailableResources() {
+ return getUnallocatedResources();
+ }
+
+ /**
+ * Add Container's resources to the accumulated allocation.
+ * @param container Container.
+ */
+ @Override
+ public void containerLaunched(Container container) {
+ ContainersMonitor.increaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ /**
+ * Subtract Container's resources to the accumulated allocation.
+ * @param container Container.
+ */
+ @Override
+ public void containerReleased(Container container) {
+ ContainersMonitor.decreaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ public ContainersMonitor getContainersMonitor() {
+ return this.scheduler.getContainersMonitor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
deleted file mode 100644
index 6e2b617..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the {@link ResourceUtilizationTracker} that equates
- * resource utilization with the total resource allocated to the container.
- */
-public class AllocationBasedResourceUtilizationTracker implements
- ResourceUtilizationTracker {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
-
- private ResourceUtilization containersAllocation;
- private ContainerScheduler scheduler;
-
- AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
- this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
- this.scheduler = scheduler;
- }
-
- /**
- * Get the accumulation of totally allocated resources to a container.
- * @return ResourceUtilization Resource Utilization.
- */
- @Override
- public ResourceUtilization getCurrentUtilization() {
- return this.containersAllocation;
- }
-
- /**
- * Add Container's resources to the accumulated Utilization.
- * @param container Container.
- */
- @Override
- public void addContainerResources(Container container) {
- ContainersMonitor.increaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- /**
- * Subtract Container's resources to the accumulated Utilization.
- * @param container Container.
- */
- @Override
- public void subtractContainerResource(Container container) {
- ContainersMonitor.decreaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- /**
- * Check if NM has resources available currently to run the container.
- * @param container Container.
- * @return True, if NM has resources available currently to run the container.
- */
- @Override
- public boolean hasResourcesAvailable(Container container) {
- long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
- return hasResourcesAvailable(pMemBytes,
- (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
- container.getResource().getVirtualCores());
- }
-
- private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
- int cpuVcores) {
- // Check physical memory.
- if (LOG.isDebugEnabled()) {
- LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
- this.containersAllocation.getPhysicalMemory(),
- (pMemBytes >> 20),
- (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
- }
- if (this.containersAllocation.getPhysicalMemory() +
- (int) (pMemBytes >> 20) >
- (int) (getContainersMonitor()
- .getPmemAllocatedForContainers() >> 20)) {
- return false;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("before vMemCheck" +
- "[isEnabled={}, current={} + asked={} > allowed={}]",
- getContainersMonitor().isVmemCheckEnabled(),
- this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
- (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
- }
- // Check virtual memory.
- if (getContainersMonitor().isVmemCheckEnabled() &&
- this.containersAllocation.getVirtualMemory() +
- (int) (vMemBytes >> 20) >
- (int) (getContainersMonitor()
- .getVmemAllocatedForContainers() >> 20)) {
- return false;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("before cpuCheck [asked={} > allowed={}]",
- this.containersAllocation.getCPU(),
- getContainersMonitor().getVCoresAllocatedForContainers());
- }
- // Check CPU. Compare using integral values of cores to avoid decimal
- // inaccuracies.
- if (!hasEnoughCpu(this.containersAllocation.getCPU(),
- getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
- return false;
- }
- return true;
- }
-
- /**
- * Returns whether there is enough space for coresRequested in totalCores.
- * Converts currentAllocation usage to nearest integer count before comparing,
- * as floats are inherently imprecise. NOTE: this calculation assumes that
- * requested core counts must be integers, and currentAllocation core count
- * must also be an integer.
- *
- * @param currentAllocation The current allocation, a float value from 0 to 1.
- * @param totalCores The total cores in the system.
- * @param coresRequested The number of cores requested.
- * @return True if currentAllocationtotalCores*coresRequested <=
- * totalCores.
- */
- public boolean hasEnoughCpu(float currentAllocation, long totalCores,
- int coresRequested) {
- // Must not cast here, as it would truncate the decimal digits.
- return Math.round(currentAllocation * totalCores)
- + coresRequested <= totalCores;
- }
-
- public ContainersMonitor getContainersMonitor() {
- return this.scheduler.getContainersMonitor();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index a61b9d1..0bebe44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +82,10 @@ public class ContainerScheduler extends AbstractService implements
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedGuaranteedContainers = new LinkedHashMap<>();
+ // sum of the resources requested by guaranteed containers in queue
+ private final Resource guaranteedResourcesDemanded =
+ Resource.newInstance(0, 0);
+
// Queue of Opportunistic Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedOpportunisticContainers = new LinkedHashMap<>();
@@ -88,6 +94,10 @@ public class ContainerScheduler extends AbstractService implements
// or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
+ // sum of the resources to be released by opportunistic containers that
+ // have been marked to be killed or paused.
+ private final Resource opportunisticResourcesToBeReleased =
+ Resource.newInstance(0, 0);
// Containers launched by the Scheduler will take a while to actually
// move to the RUNNING state, but should still be fair game for killing
@@ -128,6 +138,17 @@ public class ContainerScheduler extends AbstractService implements
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
+ @VisibleForTesting
+ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+ NodeManagerMetrics metrics, int qLength) {
+ super(ContainerScheduler.class.getName());
+ this.context = context;
+ this.dispatcher = dispatcher;
+ this.metrics = metrics;
+ this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+ this.opportunisticContainersStatus =
+ OpportunisticContainersStatus.newInstance();
+ }
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -155,20 +176,16 @@ public class ContainerScheduler extends AbstractService implements
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
+ // We assume over allocation configurations have been initialized
+ this.utilizationTracker = getResourceTracker();
}
- @VisibleForTesting
- public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
- NodeManagerMetrics metrics, int qLength) {
- super(ContainerScheduler.class.getName());
- this.context = context;
- this.dispatcher = dispatcher;
- this.metrics = metrics;
- this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
- this.utilizationTracker =
- new AllocationBasedResourceUtilizationTracker(this);
- this.opportunisticContainersStatus =
- OpportunisticContainersStatus.newInstance();
+ private AllocationBasedResourceTracker getResourceTracker() {
+ if (context.isOverAllocationEnabled()) {
+ return new UtilizationBasedResourceTracker(this);
+ } else {
+ return new AllocationBasedResourceTracker(this);
+ }
}
/**
@@ -191,14 +208,18 @@ public class ContainerScheduler extends AbstractService implements
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
- LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
+ LOG.error("Unknown event type on UpdateContainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
- startPendingContainers(maxOppQueueLength <= 0);
+ startPendingContainers(false);
+ break;
+ case SCHEDULE_CONTAINERS:
+ startPendingContainers(true);
+ break;
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
@@ -213,10 +234,10 @@ public class ContainerScheduler extends AbstractService implements
ContainerId containerId = updateEvent.getContainer().getContainerId();
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
- this.utilizationTracker.subtractContainerResource(
+ this.utilizationTracker.containerReleased(
new ContainerImpl(getConfig(), null, null, null, null,
updateEvent.getOriginalToken(), context));
- this.utilizationTracker.addContainerResources(
+ this.utilizationTracker.containerLaunched(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
@@ -232,17 +253,20 @@ public class ContainerScheduler extends AbstractService implements
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
- //Kill/pause opportunistic containers if any to make room for
- // promotion request
- reclaimOpportunisticContainerResources(updateEvent.getContainer());
+ Resources.addTo(guaranteedResourcesDemanded,
+ updateEvent.getContainer().getResource());
+ startPendingContainers(true);
}
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
+ Resources.subtractFrom(guaranteedResourcesDemanded,
+ updateEvent.getContainer().getResource());
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
+ startPendingContainers(false);
}
}
try {
@@ -269,6 +293,7 @@ public class ContainerScheduler extends AbstractService implements
|| rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
+ Resources.addTo(guaranteedResourcesDemanded, container.getResource());
} else if (execType == ExecutionType.OPPORTUNISTIC) {
queuedOpportunisticContainers
.put(container.getContainerId(), container);
@@ -279,7 +304,7 @@ public class ContainerScheduler extends AbstractService implements
}
} else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container);
- utilizationTracker.addContainerResources(container);
+ utilizationTracker.containerLaunched(container);
}
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
&& rcs.getCapability() != null) {
@@ -344,65 +369,107 @@ public class ContainerScheduler extends AbstractService implements
}
private void onResourcesReclaimed(Container container) {
- oppContainersToKill.remove(container.getContainerId());
+ ContainerId containerId = container.getContainerId();
+
+ // This could be killed externally for eg. by the ContainerManager,
+ // in which case, the container might still be queued.
+ if (queuedOpportunisticContainers.remove(containerId) != null) {
+ return;
+ }
// This could be killed externally for eg. by the ContainerManager,
// in which case, the container might still be queued.
- Container queued =
- queuedOpportunisticContainers.remove(container.getContainerId());
- if (queued == null) {
- queuedGuaranteedContainers.remove(container.getContainerId());
+ if (queuedGuaranteedContainers.remove(containerId) != null) {
+ Resources.addTo(guaranteedResourcesDemanded, container.getResource());
+ return;
+ }
+
+ if (oppContainersToKill.remove(containerId) != null) {
+ Resources.subtractFrom(
+ opportunisticResourcesToBeReleased, container.getResource());
}
// Requeue PAUSED containers
if (container.getContainerState() == ContainerState.PAUSED) {
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
- queuedGuaranteedContainers.put(container.getContainerId(), container);
+ queuedGuaranteedContainers.put(containerId, container);
+ Resources.addTo(guaranteedResourcesDemanded, container.getResource());
} else {
- queuedOpportunisticContainers.put(
- container.getContainerId(), container);
+ queuedOpportunisticContainers.put(containerId, container);
}
}
// decrement only if it was a running container
- Container completedContainer = runningContainers.remove(container
- .getContainerId());
+ Container completedContainer = runningContainers.remove(containerId);
// only a running container releases resources upon completion
boolean resourceReleased = completedContainer != null;
if (resourceReleased) {
- this.utilizationTracker.subtractContainerResource(container);
+ this.utilizationTracker.containerReleased(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
- startPendingContainers(forceStartGuaranteedContainers);
+
+ // In case of over-allocation being turned on, we may need to reclaim
+ // more resources since the opportunistic containers that have been
+ // killed or paused may have not released as much resource as we need.
+ boolean reclaimOpportunisticResources = context.isOverAllocationEnabled();
+ startPendingContainers(reclaimOpportunisticResources);
}
}
/**
* Start pending containers in the queue.
- * @param forceStartGuaranteedContaieners When this is true, start guaranteed
- * container without looking at available resource
+ * @param reclaimOpportunisticResources if set to true, resources allocated
+ * to running OPPORTUNISTIC containers will be reclaimed in
+ * cases where there are GUARANTEED containers being queued
*/
- private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
- // Start guaranteed containers that are paused, if resources available.
- boolean resourcesAvailable = startContainers(
- queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
- // Start opportunistic containers, if resources available.
- if (resourcesAvailable) {
- startContainers(queuedOpportunisticContainers.values(), false);
+ private void startPendingContainers(boolean reclaimOpportunisticResources) {
+ // When opportunistic container not allowed (which is determined by
+ // max-queue length of pending opportunistic containers <= 0), start
+ // guaranteed containers without looking at available resources and
+ // skip scanning the queue of opportunistic containers
+ if (maxOppQueueLength <= 0) {
+ forcefullyStartGuaranteedContainers();
+ return;
+ }
+
+ Resource available = utilizationTracker.getAvailableResources();
+
+ // Start guaranteed containers that are queued, if resources available.
+ boolean allGuaranteedContainersLaunched =
+ startGuaranteedContainers(available);
+ // Start opportunistic containers, if resources available, which is true
+ // if all guaranteed containers in queue have been launched.
+ if (allGuaranteedContainersLaunched) {
+ startOpportunisticContainers(available);
+ } else {
+ // If not all guaranteed containers in queue are launched, we may need
+ // to reclaim resources from opportunistic containers that are running.
+ if (reclaimOpportunisticResources) {
+ reclaimOpportunisticContainerResources();
+ }
}
}
- private boolean startContainers(
- Collection<Container> containersToBeStarted, boolean force) {
- Iterator<Container> cIter = containersToBeStarted.iterator();
+ /**
+ * Try to launch as many GUARANTEED containers as possible.
+ * @param available the amount of resources available to launch containers
+ * @return true if all queued GUARANTEED containers are launched
+ * or there is no GUARANTEED containers to launch
+ */
+ private boolean startGuaranteedContainers(Resource available) {
+ Iterator<Container> cIter =
+ queuedGuaranteedContainers.values().iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
- if (tryStartContainer(container, force)) {
+ if (isResourceAvailable(available, container)) {
+ startContainer(container);
+ Resources.subtractFrom(available, container.getResource());
cIter.remove();
+ Resources.subtractFrom(
+ guaranteedResourcesDemanded, container.getResource());
} else {
resourcesAvailable = false;
}
@@ -410,25 +477,49 @@ public class ContainerScheduler extends AbstractService implements
return resourcesAvailable;
}
- private boolean tryStartContainer(Container container, boolean force) {
- boolean containerStarted = false;
- // call startContainer without checking available resource when force==true
- if (force || resourceAvailableToStartContainer(
- container)) {
+ /**
+ * Launch all queued GUARANTEED containers without checking resource
+ * availability. This is an optimization in cases where OPPORTUNISTIC
+ * containers are not allowed on the node.
+ */
+ private void forcefullyStartGuaranteedContainers() {
+ Iterator<Container> cIter =
+ queuedGuaranteedContainers.values().iterator();
+ while (cIter.hasNext()) {
+ Container container = cIter.next();
startContainer(container);
- containerStarted = true;
+ cIter.remove();
+ Resources.subtractFrom(
+ guaranteedResourcesDemanded, container.getResource());
}
- return containerStarted;
}
-
/**
- * Check if there is resource available to start a given container
- * immediately. (This can be extended to include overallocated resources)
- * @param container the container to start
- * @return true if container can be launched directly
+ * Try to launch as many OPPORTUNISTIC containers as possible.
+ * @param available the amount of resources available to launch containers
+ * @return true if all OPPORTUNISTIC containers are launched
+ * or there is no OPPORTUNISTIC containers to launch
*/
- private boolean resourceAvailableToStartContainer(Container container) {
- return this.utilizationTracker.hasResourcesAvailable(container);
+ private boolean startOpportunisticContainers(Resource available) {
+ Iterator<Container> cIter =
+ queuedOpportunisticContainers.values().iterator();
+ boolean resourcesAvailable = true;
+ while (cIter.hasNext() && resourcesAvailable) {
+ Container container = cIter.next();
+ if (isResourceAvailable(available, container)) {
+ startContainer(container);
+ Resources.subtractFrom(available, container.getResource());
+ cIter.remove();
+ } else {
+ resourcesAvailable = false;
+ }
+ }
+ return resourcesAvailable;
+ }
+
+ private static boolean isResourceAvailable(
+ Resource resource, Container container) {
+ Resource left = Resources.subtract(resource, container.getResource());
+ return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0;
}
private boolean enqueueContainer(Container container) {
@@ -438,6 +529,7 @@ public class ContainerScheduler extends AbstractService implements
boolean isQueued;
if (isGuaranteedContainer) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
+ Resources.addTo(guaranteedResourcesDemanded, container.getResource());
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
@@ -482,18 +574,7 @@ public class ContainerScheduler extends AbstractService implements
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
-
- // When opportunistic container not allowed (which is determined by
- // max-queue length of pending opportunistic containers <= 0), start
- // guaranteed containers without looking at available resources.
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
- startPendingContainers(forceStartGuaranteedContainers);
-
- // if the guaranteed container is queued, we need to preempt opportunistic
- // containers for make room for it
- if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
- reclaimOpportunisticContainerResources(container);
- }
+ startPendingContainers(true);
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
@@ -511,19 +592,19 @@ public class ContainerScheduler extends AbstractService implements
}
@SuppressWarnings("unchecked")
- private void reclaimOpportunisticContainerResources(Container container) {
+ private void reclaimOpportunisticContainerResources() {
List<Container> extraOppContainersToReclaim =
- pickOpportunisticContainersToReclaimResources(
- container.getContainerId());
- // Kill the opportunistic containers that were chosen.
- for (Container contToReclaim : extraOppContainersToReclaim) {
+ pickOpportunisticContainersToReclaimResources();
+ killOpportunisticContainers(extraOppContainersToReclaim);
+ }
+
+ private void killOpportunisticContainers(
+ Collection<Container> containersToReclaim) {
+ for (Container contToReclaim : containersToReclaim) {
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
- "resumed";
- LOG.info(
- "Container {} will be {} to start the "
- + "execution of guaranteed container {}.",
- contToReclaim.getContainerId(), preemptionAction,
- container.getContainerId());
+ "preempted";
+ LOG.info("Container {} will be {} to start the execution of guaranteed" +
+ " containers.", contToReclaim.getContainerId(), preemptionAction);
if (usePauseEventForPreemption) {
contToReclaim.sendPauseEvent(
@@ -534,6 +615,8 @@ public class ContainerScheduler extends AbstractService implements
"Container Killed to make room for Guaranteed Container.");
}
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
+ Resources.addTo(
+ opportunisticResourcesToBeReleased, contToReclaim.getResource());
}
}
@@ -542,7 +625,7 @@ public class ContainerScheduler extends AbstractService implements
// Skip to put into runningContainers and addUtilization when recover
if (!runningContainers.containsKey(container.getContainerId())) {
runningContainers.put(container.getContainerId(), container);
- this.utilizationTracker.addContainerResources(container);
+ this.utilizationTracker.containerLaunched(container);
}
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
@@ -551,14 +634,12 @@ public class ContainerScheduler extends AbstractService implements
container.sendLaunchEvent();
}
- private List<Container> pickOpportunisticContainersToReclaimResources(
- ContainerId containerToStartId) {
+ private List<Container> pickOpportunisticContainersToReclaimResources() {
// The opportunistic containers that need to be killed for the
// given container to start.
List<Container> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
- ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
- containerToStartId);
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
// Go over the running opportunistic containers.
// Use a descending iterator to kill more recently started containers.
@@ -577,15 +658,19 @@ public class ContainerScheduler extends AbstractService implements
continue;
}
extraOpportContainersToKill.add(runningCont);
+ // In the case of over-allocation, the running container may not
+ // release as much resources as it has requested, but we'll check
+ // again if more containers need to be killed/paused when this
+ // container is released.
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), resourcesToFreeUp,
runningCont.getResource());
}
}
if (!hasSufficientResources(resourcesToFreeUp)) {
- LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
- "at the moment. Opportunistic containers are in the process of" +
- "being killed to make room.", containerToStartId);
+ LOG.warn("There are no sufficient resources to start guaranteed" +
+ " containers at the moment. Opportunistic containers are in" +
+ " the process of being killed to make room.");
}
return extraOpportContainersToKill;
}
@@ -600,34 +685,42 @@ public class ContainerScheduler extends AbstractService implements
* getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
}
- private ResourceUtilization resourcesToFreeUp(
- ContainerId containerToStartId) {
+ /**
+ * Determine how much resources are needed to be freed up to launch the given
+ * GUARANTEED container. Used to determine how many running OPPORTUNISTIC
+ * containers need to be killed/paused, assuming OPPORTUNISTIC containers to
+ * be killed/paused will release the amount of resources they have requested.
+ *
+ * If the node is over-allocating itself, this may cause not enough
+ * OPPORTUNISTIC containers being killed/paused in cases where the running
+ * OPPORTUNISTIC containers are not consuming fully their resource requests.
+ * We'd check again upon container completion events to see if more running
+ * OPPORTUNISTIC containers need to be killed/paused.
+ *
+ * @return the amount of resource needed to be reclaimed for this container
+ */
+ private ResourceUtilization resourcesToFreeUp() {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
- .newInstance(this.utilizationTracker.getCurrentUtilization());
-
- // Add to the allocation the allocation of the pending guaranteed
- // containers that will start before the current container will be started.
- for (Container container : queuedGuaranteedContainers.values()) {
- ContainersMonitor.increaseResourceUtilization(
- getContainersMonitor(), resourceAllocationToFreeUp,
- container.getResource());
- if (container.getContainerId().equals(containerToStartId)) {
- break;
- }
- }
+ .newInstance(0, 0, 0.0f);
+
+ // Add to the allocation the allocation of pending guaranteed containers.
+ ContainersMonitor.increaseResourceUtilization(getContainersMonitor(),
+ resourceAllocationToFreeUp, guaranteedResourcesDemanded);
// These resources are being freed, likely at the behest of another
// guaranteed container..
- for (Container container : oppContainersToKill.values()) {
- ContainersMonitor.decreaseResourceUtilization(
- getContainersMonitor(), resourceAllocationToFreeUp,
- container.getResource());
+ ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
+ resourceAllocationToFreeUp, opportunisticResourcesToBeReleased);
+
+ // Deduct any remaining resources available
+ Resource availableResources = utilizationTracker.getAvailableResources();
+ if (availableResources.getVirtualCores() > 0 &&
+ availableResources.getMemorySize() > 0) {
+ ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
+ resourceAllocationToFreeUp, availableResources);
}
- // Subtract the overall node resources.
- getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
- resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 294eddf..9ad4f91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -28,5 +28,7 @@ public enum ContainerSchedulerEventType {
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
CONTAINER_PAUSED,
- RECOVERY_COMPLETED
+ RECOVERY_COMPLETED,
+ // Producer: Containers Monitor when over-allocation is on
+ SCHEDULE_CONTAINERS
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
new file mode 100644
index 0000000..58b73d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * Keeps track of containers utilization over time and determines how much
+ * resources are available to launch containers when over-allocation is on.
+ */
+public abstract class NMAllocationPolicy {
+ protected final ResourceThresholds overAllocationThresholds;
+ protected final ContainersMonitor containersMonitor;
+
+ public NMAllocationPolicy(
+ ResourceThresholds overAllocationThresholds,
+ ContainersMonitor containersMonitor) {
+ this.containersMonitor = containersMonitor;
+ this.overAllocationThresholds = overAllocationThresholds;
+ }
+
+ /**
+ * Handle container launch events.
+ * @param container the container that has been launched
+ */
+ public void containerLaunched(Container container) {
+
+ }
+
+ /**
+ * Handle container release events.
+ * @param container the container that has been released
+ */
+ public void containerReleased(Container container) {
+
+ }
+
+ /**
+ * Get the amount of resources to launch containers when
+ * over-allocation is turned on.
+ * @return the amount of resources available to launch containers
+ */
+ public abstract Resource getAvailableResources();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 3c17eca..98d99c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -38,22 +39,20 @@ public interface ResourceUtilizationTracker {
ResourceUtilization getCurrentUtilization();
/**
- * Add Container's resources to Node Utilization.
- * @param container Container.
+ * Get the amount of resources currently available to launch containers.
+ * @return Resource resources available to launch containers
*/
- void addContainerResources(Container container);
+ Resource getAvailableResources();
/**
- * Subtract Container's resources to Node Utilization.
+ * Add Container's resources to Node Utilization upon container launch.
* @param container Container.
*/
- void subtractContainerResource(Container container);
+ void containerLaunched(Container container);
/**
- * Check if NM has resources available currently to run the container.
+ * Subtract Container's resources to Node Utilization upon container release.
* @param container Container.
- * @return True, if NM has resources available currently to run the container.
*/
- boolean hasResourcesAvailable(Container container);
-
+ void containerReleased(Container container);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
new file mode 100644
index 0000000..f486506
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * An implementation of NMAllocationPolicy based on the
+ * snapshot of the latest containers utilization to determine how much
+ * resources are available * to launch containers when over-allocation
+ * is turned on.
+ */
+public class SnapshotBasedOverAllocationPolicy
+ extends NMAllocationPolicy {
+
+ public SnapshotBasedOverAllocationPolicy(
+ ResourceThresholds overAllocationThresholds,
+ ContainersMonitor containersMonitor) {
+ super(overAllocationThresholds, containersMonitor);
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ ResourceUtilization utilization =
+ containersMonitor.getContainersUtilization(true).getUtilization();
+ long memoryAvailable = Math.round(
+ overAllocationThresholds.getMemoryThreshold() *
+ containersMonitor.getPmemAllocatedForContainers()) -
+ (utilization.getPhysicalMemory() << 20);
+ int vcoreAvailable = Math.round(
+ (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) *
+ containersMonitor.getVCoresAllocatedForContainers());
+ return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
new file mode 100644
index 0000000..6f9bc82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* An resource availability tracker that determines if there are resources
+* available based on if there are unallocated resources or if there are
+* un-utilized resources.
+*/
+public class UtilizationBasedResourceTracker
+ extends AllocationBasedResourceTracker {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
+
+ private final NMAllocationPolicy overAllocationPolicy;
+
+ UtilizationBasedResourceTracker(ContainerScheduler scheduler) {
+ super(scheduler);
+ this.overAllocationPolicy =
+ getContainersMonitor().getContainerOverAllocationPolicy();
+ }
+
+ @Override
+ public void containerLaunched(Container container) {
+ super.containerLaunched(container);
+ if (overAllocationPolicy != null) {
+ overAllocationPolicy.containerLaunched(container);
+ }
+ }
+
+ @Override
+ public void containerReleased(Container container) {
+ super.containerReleased(container);
+ if (overAllocationPolicy != null) {
+ overAllocationPolicy.containerReleased(container);
+ }
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ Resource resourceBasedOnAllocation = getUnallocatedResources();
+ Resource resourceBasedOnUtilization =
+ getResourcesAvailableBasedOnUtilization();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The amount of resources available based on allocation is " +
+ resourceBasedOnAllocation + ", based on utilization is " +
+ resourceBasedOnUtilization);
+ }
+
+ return Resources.componentwiseMax(resourceBasedOnAllocation,
+ resourceBasedOnUtilization);
+ }
+
+ /**
+ * Get the amount of resources based on the slack between
+ * the actual utilization and desired utilization.
+ * @return Resource resource available
+ */
+ private Resource getResourcesAvailableBasedOnUtilization() {
+ if (overAllocationPolicy == null) {
+ return Resources.none();
+ }
+
+ return overAllocationPolicy.getAvailableResources();
+ }
+
+ @Override
+ public ResourceUtilization getCurrentUtilization() {
+ return getContainersMonitor().getContainersUtilization(false)
+ .getUtilization();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 493aa4c..92613ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
+import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -345,6 +346,40 @@ public abstract class BaseContainerManagerTest {
fStates.contains(containerStatus.getState()));
}
+ public static void waitForContainerSubState(
+ ContainerManagementProtocol containerManager, ContainerId containerID,
+ ContainerSubState finalState)
+ throws InterruptedException, YarnException, IOException {
+ waitForContainerSubState(containerManager, containerID,
+ Arrays.asList(finalState), 20);
+ }
+ public static void waitForContainerSubState(
+ ContainerManagementProtocol containerManager, ContainerId containerID,
+ List<ContainerSubState> finalStates, int timeOutMax)
+ throws InterruptedException, YarnException, IOException {
+ List<ContainerId> list = new ArrayList<>();
+ list.add(containerID);
+ GetContainerStatusesRequest request =
+ GetContainerStatusesRequest.newInstance(list);
+ ContainerStatus containerStatus;
+ HashSet<ContainerSubState> fStates = new HashSet<>(finalStates);
+ int timeoutSecs = 0;
+ do {
+ Thread.sleep(1000);
+ containerStatus =
+ containerManager.getContainerStatuses(request)
+ .getContainerStatuses().get(0);
+ LOG.info("Waiting for container to get into one of states " + fStates
+ + ". Current state is " + containerStatus.getContainerSubState());
+ timeoutSecs += 1;
+ } while (!fStates.contains(containerStatus.getContainerSubState())
+ && timeoutSecs < timeOutMax);
+ LOG.info("Container state is " + containerStatus.getContainerSubState());
+ Assert.assertTrue("ContainerSubState is not correct (timedout)",
+ fStates.contains(containerStatus.getContainerSubState()));
+ }
+
+
public static void waitForApplicationState(
ContainerManagerImpl containerManager, ApplicationId appID,
ApplicationState finalState)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 8aee532..c071283 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -288,7 +288,7 @@ public class TestContainersMonitorResourceChange {
// will be 0.
assertEquals(
"Resource utilization must be default with MonitorThread's first run",
- 0, containersMonitor.getContainersUtilization()
+ 0, containersMonitor.getContainersUtilization(false).getUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
// Verify the container utilization value. Since atleast one round is done,
@@ -303,8 +303,9 @@ public class TestContainersMonitorResourceChange {
ContainersMonitorImpl containersMonitor, int timeoutMsecs)
throws InterruptedException {
int timeWaiting = 0;
- while (0 == containersMonitor.getContainersUtilization()
- .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) {
+ while (0 == containersMonitor.getContainersUtilization(false)
+ .getUtilization().compareTo(
+ ResourceUtilization.newInstance(0, 0, 0.0f))) {
if (timeWaiting >= timeoutMsecs) {
break;
}
@@ -316,7 +317,7 @@ public class TestContainersMonitorResourceChange {
}
assertTrue("Resource utilization is not changed from second run onwards",
- 0 != containersMonitor.getContainersUtilization()
+ 0 != containersMonitor.getContainersUtilization(false).getUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
new file mode 100644
index 0000000..1e8bfdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link AllocationBasedResourceTracker} class.
+ */
+public class TestAllocationBasedResourceTracker {
+
+ private ContainerScheduler mockContainerScheduler;
+
+ @Before
+ public void setup() {
+ mockContainerScheduler = mock(ContainerScheduler.class);
+ ContainersMonitor containersMonitor =
+ new ContainersMonitorImpl(mock(ContainerExecutor.class),
+ mock(AsyncDispatcher.class), mock(Context.class));
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+ conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
+ conf.setInt(YarnConfiguration.NM_VCORES, 8);
+ containersMonitor.init(conf);
+ when(mockContainerScheduler.getContainersMonitor())
+ .thenReturn(containersMonitor);
+ }
+
+ /**
+ * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
+ * hasResourceAvailable should return false.
+ */
+ @Test
+ public void testHasResourcesAvailable() {
+ AllocationBasedResourceTracker tracker =
+ new AllocationBasedResourceTracker(mockContainerScheduler);
+ Container testContainer = mock(Container.class);
+ when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ isResourcesAvailable(tracker.getAvailableResources(), testContainer));
+ tracker.containerLaunched(testContainer);
+ }
+ Assert.assertFalse(
+ isResourcesAvailable(tracker.getAvailableResources(), testContainer));
+ }
+
+ private static boolean isResourcesAvailable(
+ Resource available, Container container) {
+ return available.compareTo(container.getResource()) >= 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/879a5e7b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
deleted file mode 100644
index 82c2147..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the {@link AllocationBasedResourceUtilizationTracker} class.
- */
-public class TestAllocationBasedResourceUtilizationTracker {
-
- private ContainerScheduler mockContainerScheduler;
-
- @Before
- public void setup() {
- mockContainerScheduler = mock(ContainerScheduler.class);
- ContainersMonitor containersMonitor =
- new ContainersMonitorImpl(mock(ContainerExecutor.class),
- mock(AsyncDispatcher.class), mock(Context.class));
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
- conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
- conf.setInt(YarnConfiguration.NM_VCORES, 8);
- containersMonitor.init(conf);
- when(mockContainerScheduler.getContainersMonitor())
- .thenReturn(containersMonitor);
- }
-
- /**
- * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
- * hasResourceAvailable should return false.
- */
- @Test
- public void testHasResourcesAvailable() {
- AllocationBasedResourceUtilizationTracker tracker =
- new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
- Container testContainer = mock(Container.class);
- when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
- tracker.addContainerResources(testContainer);
- }
- Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
- }
-
- /**
- * Test the case where the current allocation has been truncated to 0.8888891
- * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
- * true.
- */
- @Test
- public void testHasEnoughCpu() {
- AllocationBasedResourceUtilizationTracker tracker =
- new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
- float currentAllocation = 0.8888891f;
- long totalCores = 9;
- int alreadyUsedCores = 8;
- Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
- (int) totalCores - alreadyUsedCores));
- Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
- (int) totalCores - alreadyUsedCores + 1));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org