You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:08:01 UTC
[39/50] [abbrv] hadoop git commit: YARN-6675. Add NM support to
launch opportunistic containers based on overallocation. Contributed by Haibo
Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
index 6b3ac67..f24df38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -66,8 +67,8 @@ public class TestContainerSchedulerRecovery {
@Mock private ContainerId containerId;
- @Mock private AllocationBasedResourceUtilizationTracker
- allocationBasedResourceUtilizationTracker;
+ @Mock private AllocationBasedResourceTracker
+ allocationBasedResourceTracker;
@InjectMocks private ContainerScheduler tempContainerScheduler =
new ContainerScheduler(context, dispatcher, metrics, 0);
@@ -85,12 +86,13 @@ public class TestContainerSchedulerRecovery {
MockitoAnnotations.initMocks(this);
spy = spy(tempContainerScheduler);
when(container.getContainerId()).thenReturn(containerId);
+ when(container.getResource()).thenReturn(Resource.newInstance(1024, 1));
when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
when(containerId.getApplicationAttemptId().getApplicationId())
.thenReturn(appId);
when(containerId.getContainerId()).thenReturn(123L);
- doNothing().when(allocationBasedResourceUtilizationTracker)
- .addContainerResources(container);
+ doNothing().when(allocationBasedResourceTracker)
+ .containerLaunched(container);
}
@After public void tearDown() {
@@ -112,8 +114,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
@@ -132,8 +134,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED, GUARANTEED,
@@ -152,8 +154,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
@@ -172,8 +174,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as LAUNCHED, GUARANTEED,
@@ -192,8 +194,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
+ .containerLaunched(container);
}
/*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
@@ -212,8 +214,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
+ .containerLaunched(container);
}
/*Test if a container is recovered as REQUESTED, GUARANTEED,
@@ -232,8 +234,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
@@ -252,8 +254,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as COMPLETED, GUARANTEED,
@@ -272,8 +274,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
@@ -292,8 +294,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as GUARANTEED but no executionType set,
@@ -311,8 +313,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED but no executionType set,
@@ -330,7 +332,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
+ .containerLaunched(container);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
new file mode 100644
index 0000000..384b116
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
@@ -0,0 +1,1121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerSubState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.ConfigurationException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Test ContainerScheduler behaviors when NM overallocation is turned on.
+ */
+public class TestContainerSchedulerWithOverAllocation
+ extends BaseContainerManagerTest {
+ private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3;
+ private static final int NM_CONTAINERS_VCORES = 4;
+ private static final int NM_CONTAINERS_MEMORY_MB = 2048;
+
+ static {
+ LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
+ }
+
+ public TestContainerSchedulerWithOverAllocation()
+ throws UnsupportedFileSystemException {
+ }
+
+ @Override
+ protected ContainerExecutor createContainerExecutor() {
+ DefaultContainerExecutor exec =
+ new LongRunningContainerSimulatingContainerExecutor();
+ exec.setConf(conf);
+ return exec;
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(
+ DeletionService delSrvc) {
+ return new LongRunningContainerSimulatingContainersManager(
+ context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user);
+ }
+
+ @Override
+ public void setup() throws IOException {
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+ NM_OPPORTUNISTIC_QUEUE_LIMIT);
+ conf.setFloat(
+ YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD,
+ 0.75f);
+ conf.setFloat(
+ YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD,
+ 0.75f);
+ super.setup();
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate do
+ * not exceed the capacity of the node. Both containers are expected to start
+ * running immediately.
+ */
+ @Test
+ public void testStartMultipleContainersWithoutOverallocation()
+ throws Exception {
+ containerManager.start();
+
+ StartContainersRequest allRequests = StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() { {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ } }
+ );
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization
+ * is very low relative to their resource request, resulting in a low node
+ * utilization. Then start another OPPORTUNISTIC containers which requests
+ * more than what's left unallocated on the node. Due to overallocation
+ * being turned on and node utilization being low, the second OPPORTUNISTIC
+ * container is also expected to be launched immediately.
+ */
+ @Test
+ public void testStartOppContainersWithPartialOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ // the current containers utilization is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(512, 0, 1.0f/8));
+
+ // start a container that requests more than what's left unallocated
+ // 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+
+ // this container is expected to be started immediately because there
+ // are (memory: 1024, vcore: 0.625) available based on over-allocation
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most
+ * of the resources they requested, resulting in a high node utilization.
+ * Then start another OPPORTUNISTIC containers which requests more than what's
+ * left unallocated on the node. Because of the high resource utilization on
+ * the node, the projected utilization, if we were to start the second
+ * OPPORTUNISTIC container immediately, will go over the NM overallocation
+ * threshold, so the second OPPORTUNISTIC container is expected to be queued.
+ */
+ @Test
+ public void testQueueOppContainerWithPartialOverallocationHighUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ // the containers utilization is high
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1500, 0, 1.0f/8));
+
+ // start a container that requests more than what's left unallocated
+ // 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+ // this container will not start immediately because there is not
+ // enough resource available at the moment either in terms of
+ // resources unallocated or in terms of the actual utilization
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.SCHEDULED);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.SCHEDULED);
+ }
+ });
+ }
+
+ /**
+ * Start two GUARANTEED containers which in aggregate takes up the whole node
+ * capacity, yet whose utilization is low relative to their resource request,
+ * resulting in a low node resource utilization. Then try to start another
+ * OPPORTUNISTIC containers. Because the resource utilization across the node
+ * is low and overallocation being turned on, the OPPORTUNISTIC container is
+ * expected to be launched immediately even though there is no resources left
+ * unallocated.
+ */
+ @Test
+ public void testStartOppContainersWithOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the current containers utilization is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(800, 0, 1.0f/8));
+
+ // start a container when there is no resources left unallocated.
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+
+ // this container is expected to be started because there is resources
+ // available because the actual utilization is very low
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+
+ /**
+ * Start two GUARANTEED containers which in aggregate take up the whole node
+ * capacity and fully utilize the resources they requested. Then try to start
+ * four OPPORTUNISTIC containers of which three will be queued and one will be
+ * killed because of the max queue length is 3.
+ */
+ @Test
+ public void testQueueOppContainersWithFullUtilization() throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the containers are fully utilizing their resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(2048, 0, 1.0f/8));
+
+ // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container
+ // queue can hold when there is no unallocated resource left.
+ List<StartContainerRequest> moreContainerRequests =
+ new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1);
+ for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) {
+ moreContainerRequests.add(
+ createStartContainerRequest(2 + a,
+ BuilderUtils.newResource(512, 1), false));
+ }
+ containerManager.startContainers(
+ StartContainersRequest.newInstance(moreContainerRequests));
+
+ // All OPPORTUNISTIC containers but the last one should be queued.
+ // The last OPPORTUNISTIC container to launch should be killed.
+ BaseContainerManagerTest.waitForContainerState(
+ containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2),
+ ContainerState.COMPLETE);
+
+ HashMap<ContainerId, ContainerSubState> expectedContainerStatus =
+ new HashMap<>();
+ expectedContainerStatus.put(
+ createContainerId(0), ContainerSubState.RUNNING);
+ expectedContainerStatus.put(
+ createContainerId(1), ContainerSubState.RUNNING);
+ expectedContainerStatus.put(
+ createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT),
+ ContainerSubState.DONE);
+ for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) {
+ expectedContainerStatus.put(
+ createContainerId(i + 2), ContainerSubState.SCHEDULED);
+ }
+ verifyContainerStatuses(expectedContainerStatus);
+ }
+
+ /**
+ * Start two GUARANTEED containers that together does not take up the
+ * whole node. Then try to start one OPPORTUNISTIC container that will
+ * fit into the remaining unallocated space on the node.
+ * The OPPORTUNISTIC container is expected to start even though the
+ * current node utilization is above the NM overallocation threshold,
+ * because it's always safe to launch containers as long as the node
+ * has not been fully allocated.
+ */
+ @Test
+ public void testStartOppContainerWithHighUtilizationNoOverallocation()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1200, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(400, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // containers utilization is above the over-allocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1600, 0, 1.0f/2));
+
+ // start a container that can just fit in the remaining unallocated space
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(400, 1), false))
+ ));
+
+ // the OPPORTUNISTIC container can be safely launched even though
+ // the container utilization is above the NM overallocation threshold
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers first whose utilization is low relative
+ * to the resources they requested, resulting in a low node utilization. Then
+ * try to start a GUARANTEED container which requests more than what's left
+ * unallocated on the node. Because the node utilization is low and NM
+ * overallocation is turned on, the GUARANTEED container is expected to be
+ * started immediately without killing any running OPPORTUNISTIC containers.
+ */
+ @Test
+ public void testKillNoOppContainersWithPartialOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // containers utilization is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(512, 0, 1.0f/8));
+
+ // start a GUARANTEED container that requests more than what's left
+ // unallocated on the node: (512 + 1024 + 824) > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ // the GUARANTEED container is expected be launched immediately without
+ // killing any OPPORTUNISTIC containers.
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers whose utilization will be high relative
+ * to the resources they requested, resulting in a high node utilization.
+ * Then try to start a GUARANTEED container which requests more than what's
+ * left unallocated on the node. Because the node is under high utilization,
+ * the second OPPORTUNISTIC container is expected to be killed in order to
+ * make room for the GUARANTEED container.
+ */
+ @Test
+ public void testKillOppContainersWithPartialOverallocationHighUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the containers utilization is very high
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1800, 0, 1.0f/8));
+
+ // start a GUARANTEED container that requests more than what's left
+ // unallocated on the node 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+ // the last launched OPPORTUNISTIC container is expected to be killed
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.DONE);
+
+ GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
+ newInstance(new ArrayList<ContainerId>() {
+ {
+ add(createContainerId(0));
+ add(createContainerId(1));
+ add(createContainerId(2));
+ }
+ });
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getContainerId().equals(createContainerId(1))) {
+ Assert.assertTrue(status.getDiagnostics().contains(
+ "Container Killed to make room for Guaranteed Container"));
+ } else {
+ Assert.assertEquals(status.getContainerId() + " is not RUNNING",
+ ContainerSubState.RUNNING, status.getContainerSubState());
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+ }
+
+
+ /**
+ * Start three OPPORTUNISTIC containers which in aggregates exceeds the
+ * capacity of the node, yet whose utilization is low relative
+ * to the resources they requested, resulting in a low node utilization.
+ * Then try to start a GUARANTEED container. Even though the node has
+ * nothing left unallocated, it is expected to start immediately
+ * without killing any running OPPORTUNISTIC containers because the node
+ * utilization is very low and overallocation is turned on.
+ */
+ @Test
+ public void testKillNoOppContainersWithOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(1024, 1), false));
+ }
+ }
+ ));
+ // All three GUARANTEED containers are all expected to start
+ // because the containers utilization is low (0 at the point)
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ // the containers utilization is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1024, 0, 1.0f/8));
+
+ // start a GUARANTEED container that requests more than what's left
+ // unallocated on the node: (512 + 1024 + 824) > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(3,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ // the GUARANTEED container is expected be launched immediately without
+ // killing any OPPORTUNISTIC containers
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(3), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ put(createContainerId(3), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start four OPPORTUNISTIC containers which in aggregates exceeds the
+ * capacity of the node. The real resource utilization of the first two
+ * OPPORTUNISTIC containers are high whereas that of the latter two are
+ * almost zero. Then try to start a GUARANTEED container. The GUARANTEED
+ * container will eventually start running after preempting the third
+ * and fourth OPPORTUNISTIC container (which releases no resources) and
+ * then the second OPPORTUNISTIC container.
+ */
+ @Test
+ public void
+ testKillOppContainersConservativelyWithOverallocationHighUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(3,
+ BuilderUtils.newResource(1024, 1), false));
+ }
+ }
+ ));
+ // All four GUARANTEED containers are all expected to start
+ // because the containers utilization is low (0 at the point)
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(3), ContainerSubState.RUNNING);
+
+ // the containers utilization is at the overallocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1536, 0, 1.0f/2));
+
+ // try to start a GUARANTEED container when there's nothing left unallocated
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(4,
+ BuilderUtils.newResource(1024, 1), true))
+ ));
+
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(4), ContainerSubState.RUNNING);
+ GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
+ newInstance(new ArrayList<ContainerId>() {
+ {
+ add(createContainerId(0));
+ add(createContainerId(1));
+ add(createContainerId(2));
+ add(createContainerId(3));
+ add(createContainerId(4));
+ }
+ });
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getContainerId().equals(createContainerId(0)) ||
+ status.getContainerId().equals(createContainerId(4))) {
+ Assert.assertEquals(
+ ContainerSubState.RUNNING, status.getContainerSubState());
+ } else {
+ Assert.assertTrue(status.getDiagnostics().contains(
+ "Container Killed to make room for Guaranteed Container"));
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers followed by one GUARANTEED container,
+ * which in aggregate exceeds the capacity of the node. The first two
+ * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED
+ * one utilizes nearly all of its resource requested. Then try to start two
+ * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are
+ * expected to be queued immediately. Upon the completion of the
+ * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers
+ * are expected to start.
+ */
+ @Test
+ public void testStartOppContainersUponContainerCompletion() throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+
+ // All three containers are all expected to start immediately
+ // because the node utilization is low (0 at the point)
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ // the contianers utilization is at the overallocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1536, 0, 1.0f/2));
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(3,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(4,
+ BuilderUtils.newResource(512, 1), false));
+ }
+ }
+ ));
+ // the two new OPPORTUNISTIC containers are expected to be queued
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(3), ContainerSubState.SCHEDULED);
+ put(createContainerId(4), ContainerSubState.SCHEDULED);
+ }
+ });
+
+ // the GUARANTEED container is completed releasing resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(100, 0, 1.0f/5));
+ allowContainerToSucceed(2);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.DONE);
+
+ // the two OPPORTUNISTIC containers are expected to start together
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(3), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(4), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.DONE);
+ put(createContainerId(3), ContainerSubState.RUNNING);
+ put(createContainerId(4), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start one GUARANTEED container that consumes all the resources on the
+ * node and keeps running, followed by two OPPORTUNISTIC containers that
+ * will be queued forever because there is no containers starting or
+ * finishing. Then try to start OPPORTUNISTIC containers out of band.
+ */
+ @Test
+ public void testStartOpportunisticContainersOutOfBand() throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(0,
+ BuilderUtils.newResource(2048, 4), true))));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+
+ // the container is fully utilizing its resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(2048, 0, 1.0f));
+
+ // send two OPPORTUNISTIC container requests that are expected to be queued
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList<StartContainerRequest>() {
+ {
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.SCHEDULED);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.SCHEDULED);
+
+ // the containers utilization dropped to the overallocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1536, 0, 1.0f/2));
+
+ // try to start containers out of band.
+ ((LongRunningContainerSimulatingContainersManager)containerManager)
+ .startContainersOutOfBandUponLowUtilization();
+
+ // no containers in queue are expected to be launched because the
+ // containers utilization is not below the over-allocation threshold
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.SCHEDULED);
+ put(createContainerId(2), ContainerSubState.SCHEDULED);
+ }
+ });
+
+ // the GUARANTEED container is completed releasing resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(100, 0, 1.0f/5));
+
+ // the containers utilization dropped way below the overallocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(512, 0, 1.0f/8));
+
+ ((LongRunningContainerSimulatingContainersManager)containerManager)
+ .startContainersOutOfBandUponLowUtilization();
+
+ // the two OPPORTUNISTIC containers are expected to be launched
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+
+ private void setContainerResourceUtilization(ResourceUtilization usage) {
+ ((ContainerMonitorForOverallocationTest)
+ containerManager.getContainersMonitor())
+ .setContainerResourceUsage(usage);
+ }
+
+ private void allowContainerToSucceed(int containerId) {
+ ((LongRunningContainerSimulatingContainerExecutor) this.exec)
+ .containerSucceeded(createContainerId(containerId));
+ }
+
+
+ protected StartContainerRequest createStartContainerRequest(int containerId,
+ Resource resource, boolean isGuaranteed) throws IOException {
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED :
+ ExecutionType.OPPORTUNISTIC;
+ Token containerToken = createContainerToken(
+ createContainerId(containerId),
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource,
+ context.getContainerTokenSecretManager(),
+ null, executionType);
+
+ return StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken);
+ }
+
+ protected void verifyContainerStatuses(
+ Map<ContainerId, ContainerSubState> expected)
+ throws IOException, YarnException {
+ List<ContainerId> statList = new ArrayList<>(expected.keySet());
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+
+ for (ContainerStatus status : containerStatuses) {
+ ContainerId containerId = status.getContainerId();
+ Assert.assertEquals(containerId + " is in unexpected state",
+ expected.get(containerId), status.getContainerSubState());
+ }
+ }
+
+ /**
+ * A container manager that sends a dummy container pid while it's cleaning
+ * up running containers. Used along with
+ * LongRunningContainerSimulatingContainerExecutor to simulate long running
+ * container processes for testing purposes.
+ */
+ private static class LongRunningContainerSimulatingContainersManager
+ extends ContainerManagerImpl {
+
+ private final String user;
+
+ LongRunningContainerSimulatingContainersManager(
+ Context context, ContainerExecutor exec,
+ DeletionService deletionContext,
+ NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics,
+ LocalDirsHandlerService dirsHandler, String user) {
+ super(context, exec, deletionContext,
+ nodeStatusUpdater, metrics, dirsHandler);
+ this.user = user;
+ }
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
+ /**
+ * Create a container launcher that signals container processes
+ * with a dummy pid. The container processes are simulated in
+ * LongRunningContainerSimulatingContainerExecutor which does
+ * not write a pid file on behalf of containers to launch, so
+ * the pid does not matter.
+ */
+ @Override
+ protected ContainersLauncher createContainersLauncher(
+ Context context, ContainerExecutor exec) {
+ ContainerManagerImpl containerManager = this;
+ return new ContainersLauncher(context, dispatcher, exec, dirsHandler,
+ this) {
+ @Override
+ protected ContainerLaunch createContainerLaunch(
+ Application app, Container container) {
+ return new ContainerLaunch(context, getConfig(), dispatcher,
+ exec, app, container, dirsHandler, containerManager) {
+ @Override
+ protected String getContainerPid(Path pidFilePath)
+ throws Exception {
+ return "123";
+ }
+
+ };
+ }
+ };
+ }
+
+ @Override
+ protected ContainersMonitor createContainersMonitor(
+ ContainerExecutor exec) {
+ return new ContainerMonitorForOverallocationTest(exec,
+ dispatcher, context);
+ }
+
+ public void startContainersOutOfBandUponLowUtilization() {
+ ((ContainerMonitorForOverallocationTest) getContainersMonitor())
+ .attemptToStartContainersUponLowUtilization();
+ }
+ }
+
+ /**
+ * A container executor that simulates long running container processes
+ * by having container launch threads sleep infinitely until it's given
+ * a signal to finish with either a success or failure exit code.
+ */
+ private static class LongRunningContainerSimulatingContainerExecutor
+ extends DefaultContainerExecutor {
+ private ConcurrentHashMap<ContainerId, ContainerFinishLatch> containers =
+ new ConcurrentHashMap<>();
+
+ public void containerSucceeded(ContainerId containerId) {
+ ContainerFinishLatch containerFinishLatch = containers.get(containerId);
+ if (containerFinishLatch != null) {
+ containerFinishLatch.toSucceed();
+ }
+ }
+
+ public void containerFailed(ContainerId containerId) {
+ ContainerFinishLatch containerFinishLatch = containers.get(containerId);
+ if (containerFinishLatch != null) {
+ containerFinishLatch.toFail();
+ }
+ }
+
+ /**
+ * Simulate long running container processes by having container launcher
+ * threads wait infinitely for a signal to finish.
+ */
+ @Override
+ public int launchContainer(ContainerStartContext ctx)
+ throws IOException, ConfigurationException {
+ ContainerId container = ctx.getContainer().getContainerId();
+ containers.putIfAbsent(container, new ContainerFinishLatch(container));
+
+ // simulate a long running container process by having the
+ // container launch thread sleep forever until it's given a
+ // signal to finish with a exit code.
+ while (!containers.get(container).toProceed) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return -1;
+ }
+ }
+
+ return containers.get(container).getContainerExitCode();
+ }
+
+ /**
+ * Override signalContainer() so that simulated container processes
+ * are properly cleaned up.
+ */
+ @Override
+ public boolean signalContainer(ContainerSignalContext ctx)
+ throws IOException {
+ containerSucceeded(ctx.getContainer().getContainerId());
+ return true;
+ }
+
+ /**
+ * A signal that container launch threads wait for before exiting
+ * in order to simulate long running container processes.
+ */
+ private static final class ContainerFinishLatch {
+ volatile boolean toProceed;
+ int exitCode;
+ ContainerId container;
+
+ ContainerFinishLatch(ContainerId containerId) {
+ exitCode = 0;
+ toProceed = false;
+ container = containerId;
+ }
+
+ void toSucceed() {
+ exitCode = 0;
+ toProceed = true;
+ }
+
+ void toFail() {
+ exitCode = -101;
+ toProceed = true;
+ }
+
+ int getContainerExitCode() {
+ // read barrier of toProceed to make sure the exit code is not stale
+ if (toProceed) {
+ LOG.debug(container + " finished with exit code: " + exitCode);
+ }
+ return exitCode;
+ }
+ }
+ }
+
+ /**
+ * A test implementation of container monitor that allows control of
+ * current resource utilization.
+ */
+ private static class ContainerMonitorForOverallocationTest
+ extends ContainersMonitorImpl {
+
+ private ResourceUtilization containerResourceUsage =
+ ResourceUtilization.newInstance(0, 0, 0.0f);
+
+ ContainerMonitorForOverallocationTest(ContainerExecutor exec,
+ AsyncDispatcher dispatcher, Context context) {
+ super(exec, dispatcher, context);
+ }
+
+ @Override
+ public long getPmemAllocatedForContainers() {
+ return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
+ }
+
+ @Override
+ public long getVmemAllocatedForContainers() {
+ float pmemRatio = getConfig().getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ return (long) (pmemRatio * getPmemAllocatedForContainers());
+ }
+
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return NM_CONTAINERS_VCORES;
+ }
+
+ @Override
+ public ContainersResourceUtilization getContainersUtilization(
+ boolean latest) {
+ return new ContainersMonitor.ContainersResourceUtilization(
+ containerResourceUsage, System.currentTimeMillis());
+ }
+
+ public void setContainerResourceUsage(
+ ResourceUtilization containerResourceUsage) {
+ this.containerResourceUsage = containerResourceUsage;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org