You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/08 20:50:23 UTC
[1/2] hadoop git commit: Revert "YARN-6675. Add NM support to launch
opportunistic containers based on overallocation. Contributed by Haibo Chen."
Repository: hadoop
Updated Branches:
refs/heads/YARN-1011 c1362b68a -> f9a7055b9
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
index 2cef53a..2ae8b97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -64,8 +63,8 @@ public class TestContainerSchedulerRecovery {
@Mock private ContainerId containerId;
- @Mock private AllocationBasedResourceTracker
- allocationBasedResourceTracker;
+ @Mock private AllocationBasedResourceUtilizationTracker
+ allocationBasedResourceUtilizationTracker;
@InjectMocks private ContainerScheduler tempContainerScheduler =
new ContainerScheduler(context, dispatcher, metrics, 0);
@@ -76,13 +75,12 @@ public class TestContainerSchedulerRecovery {
MockitoAnnotations.initMocks(this);
spy = spy(tempContainerScheduler);
when(container.getContainerId()).thenReturn(containerId);
- when(container.getResource()).thenReturn(Resource.newInstance(1024, 1));
when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
when(containerId.getApplicationAttemptId().getApplicationId())
.thenReturn(appId);
when(containerId.getContainerId()).thenReturn(123L);
- doNothing().when(allocationBasedResourceTracker)
- .containerLaunched(container);
+ doNothing().when(allocationBasedResourceUtilizationTracker)
+ .addContainerResources(container);
}
@After public void tearDown() {
@@ -103,8 +101,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
@@ -122,8 +120,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as PAUSED, GUARANTEED,
@@ -141,8 +139,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
@@ -160,8 +158,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as LAUNCHED, GUARANTEED,
@@ -179,8 +177,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+ .addContainerResources(container);
}
/*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
@@ -198,8 +196,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+ .addContainerResources(container);
}
/*Test if a container is recovered as REQUESTED, GUARANTEED,
@@ -217,8 +215,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
@@ -236,8 +234,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as COMPLETED, GUARANTEED,
@@ -255,8 +253,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
@@ -274,8 +272,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as GUARANTEED but no executionType set,
@@ -292,8 +290,8 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
/*Test if a container is recovered as PAUSED but no executionType set,
@@ -310,7 +308,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
- Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
- .containerLaunched(container);
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
deleted file mode 100644
index 384b116..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
+++ /dev/null
@@ -1,1121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ContainerSubState;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.exceptions.ConfigurationException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
-import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Test ContainerScheduler behaviors when NM overallocation is turned on.
- */
-public class TestContainerSchedulerWithOverAllocation
- extends BaseContainerManagerTest {
- private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3;
- private static final int NM_CONTAINERS_VCORES = 4;
- private static final int NM_CONTAINERS_MEMORY_MB = 2048;
-
- static {
- LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
- }
-
- public TestContainerSchedulerWithOverAllocation()
- throws UnsupportedFileSystemException {
- }
-
- @Override
- protected ContainerExecutor createContainerExecutor() {
- DefaultContainerExecutor exec =
- new LongRunningContainerSimulatingContainerExecutor();
- exec.setConf(conf);
- return exec;
- }
-
- @Override
- protected ContainerManagerImpl createContainerManager(
- DeletionService delSrvc) {
- return new LongRunningContainerSimulatingContainersManager(
- context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user);
- }
-
- @Override
- public void setup() throws IOException {
- conf.setInt(
- YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
- NM_OPPORTUNISTIC_QUEUE_LIMIT);
- conf.setFloat(
- YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD,
- 0.75f);
- conf.setFloat(
- YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD,
- 0.75f);
- super.setup();
- }
-
- /**
- * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate do
- * not exceed the capacity of the node. Both containers are expected to start
- * running immediately.
- */
- @Test
- public void testStartMultipleContainersWithoutOverallocation()
- throws Exception {
- containerManager.start();
-
- StartContainersRequest allRequests = StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() { {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(1024, 1), true));
- } }
- );
- containerManager.startContainers(allRequests);
-
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization
- * is very low relative to their resource request, resulting in a low node
- * utilization. Then start another OPPORTUNISTIC containers which requests
- * more than what's left unallocated on the node. Due to overallocation
- * being turned on and node utilization being low, the second OPPORTUNISTIC
- * container is also expected to be launched immediately.
- */
- @Test
- public void testStartOppContainersWithPartialOverallocationLowUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), true));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(824, 1), true));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
- // the current containers utilization is low
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
- // start a container that requests more than what's left unallocated
- // 512 + 1024 + 824 > 2048
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), false))
- ));
-
- // this container is expected to be started immediately because there
- // are (memory: 1024, vcore: 0.625) available based on over-allocation
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(2), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most
- * of the resources they requested, resulting in a high node utilization.
- * Then start another OPPORTUNISTIC containers which requests more than what's
- * left unallocated on the node. Because of the high resource utilization on
- * the node, the projected utilization, if we were to start the second
- * OPPORTUNISTIC container immediately, will go over the NM overallocation
- * threshold, so the second OPPORTUNISTIC container is expected to be queued.
- */
- @Test
- public void testQueueOppContainerWithPartialOverallocationHighUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), true));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(824, 1), true));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(
- containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
- // the containers utilization is high
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1500, 0, 1.0f/8));
-
- // start a container that requests more than what's left unallocated
- // 512 + 1024 + 824 > 2048
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), false))
- ));
- // this container will not start immediately because there is not
- // enough resource available at the moment either in terms of
- // resources unallocated or in terms of the actual utilization
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.SCHEDULED);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.SCHEDULED);
- }
- });
- }
-
- /**
- * Start two GUARANTEED containers which in aggregate takes up the whole node
- * capacity, yet whose utilization is low relative to their resource request,
- * resulting in a low node resource utilization. Then try to start another
- * OPPORTUNISTIC containers. Because the resource utilization across the node
- * is low and overallocation being turned on, the OPPORTUNISTIC container is
- * expected to be launched immediately even though there is no resources left
- * unallocated.
- */
- @Test
- public void testStartOppContainersWithOverallocationLowUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), true));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(1024, 1), true));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
-
- // the current containers utilization is low
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(800, 0, 1.0f/8));
-
- // start a container when there is no resources left unallocated.
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), false))
- ));
-
- // this container is expected to be started because there is resources
- // available because the actual utilization is very low
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- }
- });
- }
-
-
- /**
- * Start two GUARANTEED containers which in aggregate take up the whole node
- * capacity and fully utilize the resources they requested. Then try to start
- * four OPPORTUNISTIC containers of which three will be queued and one will be
- * killed because of the max queue length is 3.
- */
- @Test
- public void testQueueOppContainersWithFullUtilization() throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), true));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(1024, 1), true));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
-
- // the containers are fully utilizing their resources
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(2048, 0, 1.0f/8));
-
- // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container
- // queue can hold when there is no unallocated resource left.
- List<StartContainerRequest> moreContainerRequests =
- new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1);
- for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) {
- moreContainerRequests.add(
- createStartContainerRequest(2 + a,
- BuilderUtils.newResource(512, 1), false));
- }
- containerManager.startContainers(
- StartContainersRequest.newInstance(moreContainerRequests));
-
- // All OPPORTUNISTIC containers but the last one should be queued.
- // The last OPPORTUNISTIC container to launch should be killed.
- BaseContainerManagerTest.waitForContainerState(
- containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2),
- ContainerState.COMPLETE);
-
- HashMap<ContainerId, ContainerSubState> expectedContainerStatus =
- new HashMap<>();
- expectedContainerStatus.put(
- createContainerId(0), ContainerSubState.RUNNING);
- expectedContainerStatus.put(
- createContainerId(1), ContainerSubState.RUNNING);
- expectedContainerStatus.put(
- createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT),
- ContainerSubState.DONE);
- for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) {
- expectedContainerStatus.put(
- createContainerId(i + 2), ContainerSubState.SCHEDULED);
- }
- verifyContainerStatuses(expectedContainerStatus);
- }
-
- /**
- * Start two GUARANTEED containers that together does not take up the
- * whole node. Then try to start one OPPORTUNISTIC container that will
- * fit into the remaining unallocated space on the node.
- * The OPPORTUNISTIC container is expected to start even though the
- * current node utilization is above the NM overallocation threshold,
- * because it's always safe to launch containers as long as the node
- * has not been fully allocated.
- */
- @Test
- public void testStartOppContainerWithHighUtilizationNoOverallocation()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1200, 1), true));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(400, 1), true));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
-
- // containers utilization is above the over-allocation threshold
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1600, 0, 1.0f/2));
-
- // start a container that can just fit in the remaining unallocated space
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(400, 1), false))
- ));
-
- // the OPPORTUNISTIC container can be safely launched even though
- // the container utilization is above the NM overallocation threshold
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start two OPPORTUNISTIC containers first whose utilization is low relative
- * to the resources they requested, resulting in a low node utilization. Then
- * try to start a GUARANTEED container which requests more than what's left
- * unallocated on the node. Because the node utilization is low and NM
- * overallocation is turned on, the GUARANTEED container is expected to be
- * started immediately without killing any running OPPORTUNISTIC containers.
- */
- @Test
- public void testKillNoOppContainersWithPartialOverallocationLowUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(824, 1), false));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
-
- // containers utilization is low
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
- // start a GUARANTEED container that requests more than what's left
- // unallocated on the node: (512 + 1024 + 824) > 2048
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), true))
- ));
-
- // the GUARANTEED container is expected be launched immediately without
- // killing any OPPORTUNISTIC containers.
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start two OPPORTUNISTIC containers whose utilization will be high relative
- * to the resources they requested, resulting in a high node utilization.
- * Then try to start a GUARANTEED container which requests more than what's
- * left unallocated on the node. Because the node is under high utilization,
- * the second OPPORTUNISTIC container is expected to be killed in order to
- * make room for the GUARANTEED container.
- */
- @Test
- public void testKillOppContainersWithPartialOverallocationHighUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(824, 1), false));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
-
- // the containers utilization is very high
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1800, 0, 1.0f/8));
-
- // start a GUARANTEED container that requests more than what's left
- // unallocated on the node 512 + 1024 + 824 > 2048
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), true))
- ));
-
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
- // the last launched OPPORTUNISTIC container is expected to be killed
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.DONE);
-
- GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
- newInstance(new ArrayList<ContainerId>() {
- {
- add(createContainerId(0));
- add(createContainerId(1));
- add(createContainerId(2));
- }
- });
- List<ContainerStatus> containerStatuses = containerManager
- .getContainerStatuses(statRequest).getContainerStatuses();
- for (ContainerStatus status : containerStatuses) {
- if (status.getContainerId().equals(createContainerId(1))) {
- Assert.assertTrue(status.getDiagnostics().contains(
- "Container Killed to make room for Guaranteed Container"));
- } else {
- Assert.assertEquals(status.getContainerId() + " is not RUNNING",
- ContainerSubState.RUNNING, status.getContainerSubState());
- }
- System.out.println("\nStatus : [" + status + "]\n");
- }
- }
-
-
- /**
- * Start three OPPORTUNISTIC containers which in aggregates exceeds the
- * capacity of the node, yet whose utilization is low relative
- * to the resources they requested, resulting in a low node utilization.
- * Then try to start a GUARANTEED container. Even though the node has
- * nothing left unallocated, it is expected to start immediately
- * without killing any running OPPORTUNISTIC containers because the node
- * utilization is very low and overallocation is turned on.
- */
- @Test
- public void testKillNoOppContainersWithOverallocationLowUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(2,
- BuilderUtils.newResource(1024, 1), false));
- }
- }
- ));
- // All three GUARANTEED containers are all expected to start
- // because the containers utilization is low (0 at the point)
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- // the containers utilization is low
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1024, 0, 1.0f/8));
-
- // start a GUARANTEED container that requests more than what's left
- // unallocated on the node: (512 + 1024 + 824) > 2048
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(3,
- BuilderUtils.newResource(512, 1), true))
- ));
-
- // the GUARANTEED container is expected be launched immediately without
- // killing any OPPORTUNISTIC containers
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(3), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- put(createContainerId(3), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start four OPPORTUNISTIC containers which in aggregates exceeds the
- * capacity of the node. The real resource utilization of the first two
- * OPPORTUNISTIC containers are high whereas that of the latter two are
- * almost zero. Then try to start a GUARANTEED container. The GUARANTEED
- * container will eventually start running after preempting the third
- * and fourth OPPORTUNISTIC container (which releases no resources) and
- * then the second OPPORTUNISTIC container.
- */
- @Test
- public void
- testKillOppContainersConservativelyWithOverallocationHighUtilization()
- throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(1024, 1), false));
- add(createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), false));
- add(createStartContainerRequest(3,
- BuilderUtils.newResource(1024, 1), false));
- }
- }
- ));
- // All four GUARANTEED containers are all expected to start
- // because the containers utilization is low (0 at the point)
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(3), ContainerSubState.RUNNING);
-
- // the containers utilization is at the overallocation threshold
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
- // try to start a GUARANTEED container when there's nothing left unallocated
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(4,
- BuilderUtils.newResource(1024, 1), true))
- ));
-
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(4), ContainerSubState.RUNNING);
- GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
- newInstance(new ArrayList<ContainerId>() {
- {
- add(createContainerId(0));
- add(createContainerId(1));
- add(createContainerId(2));
- add(createContainerId(3));
- add(createContainerId(4));
- }
- });
- List<ContainerStatus> containerStatuses = containerManager
- .getContainerStatuses(statRequest).getContainerStatuses();
- for (ContainerStatus status : containerStatuses) {
- if (status.getContainerId().equals(createContainerId(0)) ||
- status.getContainerId().equals(createContainerId(4))) {
- Assert.assertEquals(
- ContainerSubState.RUNNING, status.getContainerSubState());
- } else {
- Assert.assertTrue(status.getDiagnostics().contains(
- "Container Killed to make room for Guaranteed Container"));
- }
- System.out.println("\nStatus : [" + status + "]\n");
- }
- }
-
- /**
- * Start two OPPORTUNISTIC containers followed by one GUARANTEED container,
- * which in aggregate exceeds the capacity of the node. The first two
- * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED
- * one utilizes nearly all of its resource requested. Then try to start two
- * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are
- * expected to be queued immediately. Upon the completion of the
- * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers
- * are expected to start.
- */
- @Test
- public void testStartOppContainersUponContainerCompletion() throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(0,
- BuilderUtils.newResource(512, 1), false));
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(512, 1), false));
- add(createStartContainerRequest(2,
- BuilderUtils.newResource(1024, 1), true));
- }
- }
- ));
-
- // All three containers are all expected to start immediately
- // because the node utilization is low (0 at the point)
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- // the contianers utilization is at the overallocation threshold
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(3,
- BuilderUtils.newResource(512, 1), false));
- add(createStartContainerRequest(4,
- BuilderUtils.newResource(512, 1), false));
- }
- }
- ));
- // the two new OPPORTUNISTIC containers are expected to be queued
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(3), ContainerSubState.SCHEDULED);
- put(createContainerId(4), ContainerSubState.SCHEDULED);
- }
- });
-
- // the GUARANTEED container is completed releasing resources
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(100, 0, 1.0f/5));
- allowContainerToSucceed(2);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.DONE);
-
- // the two OPPORTUNISTIC containers are expected to start together
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(3), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(4), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.DONE);
- put(createContainerId(3), ContainerSubState.RUNNING);
- put(createContainerId(4), ContainerSubState.RUNNING);
- }
- });
- }
-
- /**
- * Start one GUARANTEED container that consumes all the resources on the
- * node and keeps running, followed by two OPPORTUNISTIC containers that
- * will be queued forever because there is no containers starting or
- * finishing. Then try to start OPPORTUNISTIC containers out of band.
- */
- @Test
- public void testStartOpportunisticContainersOutOfBand() throws Exception {
- containerManager.start();
-
- containerManager.startContainers(StartContainersRequest.newInstance(
- Collections.singletonList(
- createStartContainerRequest(0,
- BuilderUtils.newResource(2048, 4), true))));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(0), ContainerSubState.RUNNING);
-
- // the container is fully utilizing its resources
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(2048, 0, 1.0f));
-
- // send two OPPORTUNISTIC container requests that are expected to be queued
- containerManager.startContainers(StartContainersRequest.newInstance(
- new ArrayList<StartContainerRequest>() {
- {
- add(createStartContainerRequest(1,
- BuilderUtils.newResource(512, 1), false));
- add(createStartContainerRequest(2,
- BuilderUtils.newResource(512, 1), false));
- }
- }
- ));
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.SCHEDULED);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.SCHEDULED);
-
- // the containers utilization dropped to the overallocation threshold
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
- // try to start containers out of band.
- ((LongRunningContainerSimulatingContainersManager)containerManager)
- .startContainersOutOfBandUponLowUtilization();
-
- // no containers in queue are expected to be launched because the
- // containers utilization is not below the over-allocation threshold
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.SCHEDULED);
- put(createContainerId(2), ContainerSubState.SCHEDULED);
- }
- });
-
- // the GUARANTEED container is completed releasing resources
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(100, 0, 1.0f/5));
-
- // the containers utilization dropped way below the overallocation threshold
- setContainerResourceUtilization(
- ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
- ((LongRunningContainerSimulatingContainersManager)containerManager)
- .startContainersOutOfBandUponLowUtilization();
-
- // the two OPPORTUNISTIC containers are expected to be launched
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(1), ContainerSubState.RUNNING);
- BaseContainerManagerTest.waitForContainerSubState(containerManager,
- createContainerId(2), ContainerSubState.RUNNING);
-
- verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
- {
- put(createContainerId(0), ContainerSubState.RUNNING);
- put(createContainerId(1), ContainerSubState.RUNNING);
- put(createContainerId(2), ContainerSubState.RUNNING);
- }
- });
- }
-
-
- private void setContainerResourceUtilization(ResourceUtilization usage) {
- ((ContainerMonitorForOverallocationTest)
- containerManager.getContainersMonitor())
- .setContainerResourceUsage(usage);
- }
-
- private void allowContainerToSucceed(int containerId) {
- ((LongRunningContainerSimulatingContainerExecutor) this.exec)
- .containerSucceeded(createContainerId(containerId));
- }
-
-
- protected StartContainerRequest createStartContainerRequest(int containerId,
- Resource resource, boolean isGuaranteed) throws IOException {
- ContainerLaunchContext containerLaunchContext =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
- ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED :
- ExecutionType.OPPORTUNISTIC;
- Token containerToken = createContainerToken(
- createContainerId(containerId),
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource,
- context.getContainerTokenSecretManager(),
- null, executionType);
-
- return StartContainerRequest.newInstance(
- containerLaunchContext, containerToken);
- }
-
- protected void verifyContainerStatuses(
- Map<ContainerId, ContainerSubState> expected)
- throws IOException, YarnException {
- List<ContainerId> statList = new ArrayList<>(expected.keySet());
- GetContainerStatusesRequest statRequest =
- GetContainerStatusesRequest.newInstance(statList);
- List<ContainerStatus> containerStatuses = containerManager
- .getContainerStatuses(statRequest).getContainerStatuses();
-
- for (ContainerStatus status : containerStatuses) {
- ContainerId containerId = status.getContainerId();
- Assert.assertEquals(containerId + " is in unexpected state",
- expected.get(containerId), status.getContainerSubState());
- }
- }
-
- /**
- * A container manager that sends a dummy container pid while it's cleaning
- * up running containers. Used along with
- * LongRunningContainerSimulatingContainerExecutor to simulate long running
- * container processes for testing purposes.
- */
- private static class LongRunningContainerSimulatingContainersManager
- extends ContainerManagerImpl {
-
- private final String user;
-
- LongRunningContainerSimulatingContainersManager(
- Context context, ContainerExecutor exec,
- DeletionService deletionContext,
- NodeStatusUpdater nodeStatusUpdater,
- NodeManagerMetrics metrics,
- LocalDirsHandlerService dirsHandler, String user) {
- super(context, exec, deletionContext,
- nodeStatusUpdater, metrics, dirsHandler);
- this.user = user;
- }
-
- @Override
- protected UserGroupInformation getRemoteUgi() throws YarnException {
- ApplicationId appId = ApplicationId.newInstance(0, 0);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 1);
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
- ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
- .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
- .getKeyId()));
- return ugi;
- }
-
- /**
- * Create a container launcher that signals container processes
- * with a dummy pid. The container processes are simulated in
- * LongRunningContainerSimulatingContainerExecutor which does
- * not write a pid file on behalf of containers to launch, so
- * the pid does not matter.
- */
- @Override
- protected ContainersLauncher createContainersLauncher(
- Context context, ContainerExecutor exec) {
- ContainerManagerImpl containerManager = this;
- return new ContainersLauncher(context, dispatcher, exec, dirsHandler,
- this) {
- @Override
- protected ContainerLaunch createContainerLaunch(
- Application app, Container container) {
- return new ContainerLaunch(context, getConfig(), dispatcher,
- exec, app, container, dirsHandler, containerManager) {
- @Override
- protected String getContainerPid(Path pidFilePath)
- throws Exception {
- return "123";
- }
-
- };
- }
- };
- }
-
- @Override
- protected ContainersMonitor createContainersMonitor(
- ContainerExecutor exec) {
- return new ContainerMonitorForOverallocationTest(exec,
- dispatcher, context);
- }
-
- public void startContainersOutOfBandUponLowUtilization() {
- ((ContainerMonitorForOverallocationTest) getContainersMonitor())
- .attemptToStartContainersUponLowUtilization();
- }
- }
-
- /**
- * A container executor that simulates long running container processes
- * by having container launch threads sleep infinitely until it's given
- * a signal to finish with either a success or failure exit code.
- */
- private static class LongRunningContainerSimulatingContainerExecutor
- extends DefaultContainerExecutor {
- private ConcurrentHashMap<ContainerId, ContainerFinishLatch> containers =
- new ConcurrentHashMap<>();
-
- public void containerSucceeded(ContainerId containerId) {
- ContainerFinishLatch containerFinishLatch = containers.get(containerId);
- if (containerFinishLatch != null) {
- containerFinishLatch.toSucceed();
- }
- }
-
- public void containerFailed(ContainerId containerId) {
- ContainerFinishLatch containerFinishLatch = containers.get(containerId);
- if (containerFinishLatch != null) {
- containerFinishLatch.toFail();
- }
- }
-
- /**
- * Simulate long running container processes by having container launcher
- * threads wait infinitely for a signal to finish.
- */
- @Override
- public int launchContainer(ContainerStartContext ctx)
- throws IOException, ConfigurationException {
- ContainerId container = ctx.getContainer().getContainerId();
- containers.putIfAbsent(container, new ContainerFinishLatch(container));
-
- // simulate a long running container process by having the
- // container launch thread sleep forever until it's given a
- // signal to finish with a exit code.
- while (!containers.get(container).toProceed) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- return -1;
- }
- }
-
- return containers.get(container).getContainerExitCode();
- }
-
- /**
- * Override signalContainer() so that simulated container processes
- * are properly cleaned up.
- */
- @Override
- public boolean signalContainer(ContainerSignalContext ctx)
- throws IOException {
- containerSucceeded(ctx.getContainer().getContainerId());
- return true;
- }
-
- /**
- * A signal that container launch threads wait for before exiting
- * in order to simulate long running container processes.
- */
- private static final class ContainerFinishLatch {
- volatile boolean toProceed;
- int exitCode;
- ContainerId container;
-
- ContainerFinishLatch(ContainerId containerId) {
- exitCode = 0;
- toProceed = false;
- container = containerId;
- }
-
- void toSucceed() {
- exitCode = 0;
- toProceed = true;
- }
-
- void toFail() {
- exitCode = -101;
- toProceed = true;
- }
-
- int getContainerExitCode() {
- // read barrier of toProceed to make sure the exit code is not stale
- if (toProceed) {
- LOG.debug(container + " finished with exit code: " + exitCode);
- }
- return exitCode;
- }
- }
- }
-
- /**
- * A test implementation of container monitor that allows control of
- * current resource utilization.
- */
- private static class ContainerMonitorForOverallocationTest
- extends ContainersMonitorImpl {
-
- private ResourceUtilization containerResourceUsage =
- ResourceUtilization.newInstance(0, 0, 0.0f);
-
- ContainerMonitorForOverallocationTest(ContainerExecutor exec,
- AsyncDispatcher dispatcher, Context context) {
- super(exec, dispatcher, context);
- }
-
- @Override
- public long getPmemAllocatedForContainers() {
- return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
- }
-
- @Override
- public long getVmemAllocatedForContainers() {
- float pmemRatio = getConfig().getFloat(
- YarnConfiguration.NM_VMEM_PMEM_RATIO,
- YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
- return (long) (pmemRatio * getPmemAllocatedForContainers());
- }
-
- @Override
- public long getVCoresAllocatedForContainers() {
- return NM_CONTAINERS_VCORES;
- }
-
- @Override
- public ContainersResourceUtilization getContainersUtilization(
- boolean latest) {
- return new ContainersMonitor.ContainersResourceUtilization(
- containerResourceUsage, System.currentTimeMillis());
- }
-
- public void setContainerResourceUsage(
- ResourceUtilization containerResourceUsage) {
- this.containerResourceUsage = containerResourceUsage;
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: Revert "YARN-6675. Add NM support to launch
opportunistic containers based on overallocation. Contributed by Haibo Chen."
Posted by ha...@apache.org.
Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."
This reverts commit c1362b68af03c546f4c0802758e9729d2372ab6c.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f9a7055b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f9a7055b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f9a7055b
Branch: refs/heads/YARN-1011
Commit: f9a7055b94f4c1d4187af6a189e20f51b6396fd6
Parents: c1362b6
Author: Haibo Chen <ha...@apache.org>
Authored: Tue May 8 13:50:06 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Tue May 8 13:50:06 2018 -0700
----------------------------------------------------------------------
.../nodemanager/NodeStatusUpdaterImpl.java | 2 +-
.../containermanager/ContainerManagerImpl.java | 8 +-
.../launcher/ContainerLaunch.java | 2 +-
.../launcher/ContainersLauncher.java | 9 +-
.../monitor/ContainersMonitor.java | 38 +-
.../monitor/ContainersMonitorImpl.java | 56 +-
.../AllocationBasedResourceTracker.java | 114 --
...locationBasedResourceUtilizationTracker.java | 158 +++
.../scheduler/ContainerScheduler.java | 318 ++---
.../scheduler/ContainerSchedulerEventType.java | 4 +-
.../scheduler/NMAllocationPolicy.java | 63 -
.../scheduler/ResourceUtilizationTracker.java | 17 +-
.../SnapshotBasedOverAllocationPolicy.java | 54 -
.../UtilizationBasedResourceTracker.java | 95 --
.../BaseContainerManagerTest.java | 35 -
.../TestContainersMonitorResourceChange.java | 9 +-
.../TestAllocationBasedResourceTracker.java | 82 --
...locationBasedResourceUtilizationTracker.java | 93 ++
.../TestContainerSchedulerRecovery.java | 58 +-
...estContainerSchedulerWithOverAllocation.java | 1121 ------------------
20 files changed, 420 insertions(+), 1916 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index a2f70d5..44f9740 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -532,7 +532,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private ResourceUtilization getContainersUtilization() {
ContainersMonitor containersMonitor =
this.context.getContainerManager().getContainersMonitor();
- return containersMonitor.getContainersUtilization(false).getUtilization();
+ return containersMonitor.getContainersUtilization();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 77ea4fa..3470910 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -243,12 +243,6 @@ public class ContainerManagerImpl extends CompositeService implements
metrics);
addService(rsrcLocalizationSrvc);
- this.containersMonitor = createContainersMonitor(exec);
- addService(this.containersMonitor);
-
- // ContainersLauncher must be added after ContainersMonitor
- // because the former depends on the latter to initialize
- // over-allocation first.
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
@@ -273,6 +267,8 @@ public class ContainerManagerImpl extends CompositeService implements
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
}
+ this.containersMonitor = createContainersMonitor(exec);
+ addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 5fb9b51..3875cbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1074,7 +1074,7 @@ public class ContainerLaunch implements Callable<Integer> {
* @return Process ID
* @throws Exception
*/
- protected String getContainerPid(Path pidFilePath) throws Exception {
+ private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr =
container.getContainerId().toString();
String processId = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index c3d0a4d..cfd5d6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -116,7 +116,8 @@ public class ContainersLauncher extends AbstractService
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
- createContainerLaunch(app, event.getContainer());
+ new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
+ event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
@@ -212,10 +213,4 @@ public class ContainersLauncher extends AbstractService
break;
}
}
-
- protected ContainerLaunch createContainerLaunch(
- Application app, Container container) {
- return new ContainerLaunch(context, getConfig(), dispatcher,
- exec, app, container, dirsHandler, containerManager);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 8da4ec4..64831e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -23,24 +23,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
-
- /**
- * Get the aggregate resource utilization of containers running on the node,
- * with a timestamp of the measurement.
- * @param latest true if the latest result should be returned
- * @return ResourceUtilization resource utilization of all containers
- */
- ContainersResourceUtilization getContainersUtilization(boolean latest);
-
- /**
- * Get the policy to over-allocate containers when over-allocation is on.
- * @return null if over-allocation is turned off
- */
- NMAllocationPolicy getContainerOverAllocationPolicy();
+ ResourceUtilization getContainersUtilization();
float getVmemRatio();
@@ -80,26 +66,4 @@ public interface ContainersMonitor extends Service,
* containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
}
-
- /**
- * A snapshot of resource utilization of all containers with the timestamp.
- */
- final class ContainersResourceUtilization {
- private final ResourceUtilization utilization;
- private final long timestamp;
-
- public ContainersResourceUtilization(
- ResourceUtilization utilization, long timestamp) {
- this.utilization = utilization;
- this.timestamp = timestamp;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public ResourceUtilization getUtilization() {
- return utilization;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index c3312f8..acc256f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -23,10 +23,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,9 +106,8 @@ public class ContainersMonitorImpl extends AbstractService implements
CPU, MEMORY
}
- private ContainersResourceUtilization latestContainersUtilization;
+ private ResourceUtilization containersUtilization;
- private NMAllocationPolicy overAllocationPolicy;
private ResourceThresholds overAllocationPreemptionThresholds;
private int overAlloctionPreemptionCpuCount = -1;
@@ -128,8 +123,7 @@ public class ContainersMonitorImpl extends AbstractService implements
this.monitoringThread = new MonitoringThread();
- this.latestContainersUtilization = new ContainersResourceUtilization(
- ResourceUtilization.newInstance(-1, -1, -1.0f), -1L);
+ this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
}
@Override
@@ -336,10 +330,6 @@ public class ContainersMonitorImpl extends AbstractService implements
this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance(
cpuPreemptionThreshold, memoryPreemptionThreshold);
- // TODO make this configurable
- this.overAllocationPolicy =
- createOverAllocationPolicy(resourceThresholds);
-
LOG.info("NodeManager oversubscription enabled with overallocation " +
"thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -347,11 +337,6 @@ public class ContainersMonitorImpl extends AbstractService implements
cpuPreemptionThreshold + ")");
}
- protected NMAllocationPolicy createOverAllocationPolicy(
- ResourceThresholds resourceThresholds) {
- return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
- }
-
private boolean isResourceCalculatorAvailable() {
if (resourceCalculatorPlugin == null) {
LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -624,12 +609,7 @@ public class ContainersMonitorImpl extends AbstractService implements
}
// Save the aggregated utilization of the containers
- setLatestContainersUtilization(trackedContainersUtilization);
-
- // check opportunity to start containers if over-allocation is on
- if (context.isOverAllocationEnabled()) {
- attemptToStartContainersUponLowUtilization();
- }
+ setContainersUtilization(trackedContainersUtilization);
// Publish the container utilization metrics to node manager
// metrics system.
@@ -995,34 +975,12 @@ public class ContainersMonitorImpl extends AbstractService implements
}
@Override
- public ContainersResourceUtilization getContainersUtilization(
- boolean latest) {
- // TODO update containerUtilization if latest is true
- return this.latestContainersUtilization;
- }
-
- @Override
- public NMAllocationPolicy getContainerOverAllocationPolicy() {
- return overAllocationPolicy;
- }
-
- private void setLatestContainersUtilization(ResourceUtilization utilization) {
- this.latestContainersUtilization = new ContainersResourceUtilization(
- utilization, System.currentTimeMillis());
+ public ResourceUtilization getContainersUtilization() {
+ return this.containersUtilization;
}
- @VisibleForTesting
- public void attemptToStartContainersUponLowUtilization() {
- if (getContainerOverAllocationPolicy() != null) {
- Resource available = getContainerOverAllocationPolicy()
- .getAvailableResources();
- if (available.getMemorySize() > 0 &&
- available.getVirtualCores() > 0) {
- eventDispatcher.getEventHandler().handle(
- new ContainerSchedulerEvent(null,
- ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
- }
- }
+ private void setContainersUtilization(ResourceUtilization utilization) {
+ this.containersUtilization = utilization;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
deleted file mode 100644
index 86b3698..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the resource utilization tracker that equates
- * resource utilization with the total resource allocated to the container.
- */
-public class AllocationBasedResourceTracker
- implements ResourceUtilizationTracker {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
- private static final Resource UNAVAILABLE =
- Resource.newInstance(0, 0);
-
- private ResourceUtilization containersAllocation;
- private ContainerScheduler scheduler;
-
-
- AllocationBasedResourceTracker(ContainerScheduler scheduler) {
- this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
- this.scheduler = scheduler;
- }
-
- /**
- * Get the accumulation of totally allocated resources to containers.
- * @return ResourceUtilization Resource Utilization.
- */
- @Override
- public ResourceUtilization getCurrentUtilization() {
- return this.containersAllocation;
- }
-
- /**
- * Get the amount of resources that have not been allocated to containers.
- * @return Resource resources that have not been allocated to containers.
- */
- protected Resource getUnallocatedResources() {
- // unallocated resources = node capacity - containers allocation
- // = -(container allocation - node capacity)
- ResourceUtilization allocationClone =
- ResourceUtilization.newInstance(containersAllocation);
- getContainersMonitor()
- .subtractNodeResourcesFromResourceUtilization(allocationClone);
-
- Resource unallocated = UNAVAILABLE;
- if (allocationClone.getCPU() <= 0 &&
- allocationClone.getPhysicalMemory() <= 0 &&
- allocationClone.getVirtualMemory() <= 0) {
- int cpu = Math.round(allocationClone.getCPU() *
- getContainersMonitor().getVCoresAllocatedForContainers());
- long memory = allocationClone.getPhysicalMemory();
- unallocated = Resource.newInstance(-memory, -cpu);
- }
- return unallocated;
- }
-
-
- @Override
- public Resource getAvailableResources() {
- return getUnallocatedResources();
- }
-
- /**
- * Add Container's resources to the accumulated allocation.
- * @param container Container.
- */
- @Override
- public void containerLaunched(Container container) {
- ContainersMonitor.increaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- /**
- * Subtract Container's resources to the accumulated allocation.
- * @param container Container.
- */
- @Override
- public void containerReleased(Container container) {
- ContainersMonitor.decreaseResourceUtilization(
- getContainersMonitor(), this.containersAllocation,
- container.getResource());
- }
-
- public ContainersMonitor getContainersMonitor() {
- return this.scheduler.getContainersMonitor();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..6e2b617
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link ResourceUtilizationTracker} that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceUtilizationTracker implements
+ ResourceUtilizationTracker {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
+
+ private ResourceUtilization containersAllocation;
+ private ContainerScheduler scheduler;
+
+ AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
+ this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Get the accumulation of totally allocated resources to a container.
+ * @return ResourceUtilization Resource Utilization.
+ */
+ @Override
+ public ResourceUtilization getCurrentUtilization() {
+ return this.containersAllocation;
+ }
+
+ /**
+ * Add Container's resources to the accumulated Utilization.
+ * @param container Container.
+ */
+ @Override
+ public void addContainerResources(Container container) {
+ ContainersMonitor.increaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ /**
+ * Subtract Container's resources to the accumulated Utilization.
+ * @param container Container.
+ */
+ @Override
+ public void subtractContainerResource(Container container) {
+ ContainersMonitor.decreaseResourceUtilization(
+ getContainersMonitor(), this.containersAllocation,
+ container.getResource());
+ }
+
+ /**
+ * Check if NM has resources available currently to run the container.
+ * @param container Container.
+ * @return True, if NM has resources available currently to run the container.
+ */
+ @Override
+ public boolean hasResourcesAvailable(Container container) {
+ long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
+ return hasResourcesAvailable(pMemBytes,
+ (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
+ container.getResource().getVirtualCores());
+ }
+
+ private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
+ int cpuVcores) {
+ // Check physical memory.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
+ this.containersAllocation.getPhysicalMemory(),
+ (pMemBytes >> 20),
+ (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
+ }
+ if (this.containersAllocation.getPhysicalMemory() +
+ (int) (pMemBytes >> 20) >
+ (int) (getContainersMonitor()
+ .getPmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("before vMemCheck" +
+ "[isEnabled={}, current={} + asked={} > allowed={}]",
+ getContainersMonitor().isVmemCheckEnabled(),
+ this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
+ (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
+ }
+ // Check virtual memory.
+ if (getContainersMonitor().isVmemCheckEnabled() &&
+ this.containersAllocation.getVirtualMemory() +
+ (int) (vMemBytes >> 20) >
+ (int) (getContainersMonitor()
+ .getVmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("before cpuCheck [asked={} > allowed={}]",
+ this.containersAllocation.getCPU(),
+ getContainersMonitor().getVCoresAllocatedForContainers());
+ }
+ // Check CPU. Compare using integral values of cores to avoid decimal
+ // inaccuracies.
+ if (!hasEnoughCpu(this.containersAllocation.getCPU(),
+ getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns whether there is enough space for coresRequested in totalCores.
+ * Converts currentAllocation usage to nearest integer count before comparing,
+ * as floats are inherently imprecise. NOTE: this calculation assumes that
+ * requested core counts must be integers, and currentAllocation core count
+ * must also be an integer.
+ *
+ * @param currentAllocation The current allocation, a float value from 0 to 1.
+ * @param totalCores The total cores in the system.
+ * @param coresRequested The number of cores requested.
+ * @return True if currentAllocationtotalCores*coresRequested <=
+ * totalCores.
+ */
+ public boolean hasEnoughCpu(float currentAllocation, long totalCores,
+ int coresRequested) {
+ // Must not cast here, as it would truncate the decimal digits.
+ return Math.round(currentAllocation * totalCores)
+ + coresRequested <= totalCores;
+ }
+
+ public ContainersMonitor getContainersMonitor() {
+ return this.scheduler.getContainersMonitor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index e8341c9..d9b713f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -43,7 +42,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,10 +74,6 @@ public class ContainerScheduler extends AbstractService implements
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedGuaranteedContainers = new LinkedHashMap<>();
- // sum of the resources requested by guaranteed containers in queue
- private final Resource guaranteedResourcesDemanded =
- Resource.newInstance(0, 0);
-
// Queue of Opportunistic Containers waiting for resources to run
private final LinkedHashMap<ContainerId, Container>
queuedOpportunisticContainers = new LinkedHashMap<>();
@@ -88,10 +82,6 @@ public class ContainerScheduler extends AbstractService implements
// or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
- // sum of the resources to be released by opportunistic containers that
- // have been marked to be killed or paused.
- private final Resource opportunisticResourcesToBeReleased =
- Resource.newInstance(0, 0);
// Containers launched by the Scheduler will take a while to actually
// move to the RUNNING state, but should still be fair game for killing
@@ -129,17 +119,6 @@ public class ContainerScheduler extends AbstractService implements
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
- @VisibleForTesting
- public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
- NodeManagerMetrics metrics, int qLength) {
- super(ContainerScheduler.class.getName());
- this.context = context;
- this.dispatcher = dispatcher;
- this.metrics = metrics;
- this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
- this.opportunisticContainersStatus =
- OpportunisticContainersStatus.newInstance();
- }
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -149,16 +128,20 @@ public class ContainerScheduler extends AbstractService implements
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
- // We assume over allocation configurations have been initialized
- this.utilizationTracker = getResourceTracker();
}
- private AllocationBasedResourceTracker getResourceTracker() {
- if (context.isOverAllocationEnabled()) {
- return new UtilizationBasedResourceTracker(this);
- } else {
- return new AllocationBasedResourceTracker(this);
- }
+ @VisibleForTesting
+ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+ NodeManagerMetrics metrics, int qLength) {
+ super(ContainerScheduler.class.getName());
+ this.context = context;
+ this.dispatcher = dispatcher;
+ this.metrics = metrics;
+ this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+ this.utilizationTracker =
+ new AllocationBasedResourceUtilizationTracker(this);
+ this.opportunisticContainersStatus =
+ OpportunisticContainersStatus.newInstance();
}
/**
@@ -181,18 +164,14 @@ public class ContainerScheduler extends AbstractService implements
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
- LOG.error("Unknown event type on UpdateContainer: " + event.getType());
+ LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
- startPendingContainers(false);
- break;
- case SCHEDULE_CONTAINERS:
- startPendingContainers(true);
- break;
+ startPendingContainers(maxOppQueueLength <= 0);
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
@@ -207,10 +186,10 @@ public class ContainerScheduler extends AbstractService implements
ContainerId containerId = updateEvent.getContainer().getContainerId();
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
- this.utilizationTracker.containerReleased(
+ this.utilizationTracker.subtractContainerResource(
new ContainerImpl(getConfig(), null, null, null, null,
updateEvent.getOriginalToken(), context));
- this.utilizationTracker.containerLaunched(
+ this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
@@ -226,22 +205,20 @@ public class ContainerScheduler extends AbstractService implements
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
- Resources.addTo(guaranteedResourcesDemanded,
- updateEvent.getContainer().getResource());
- startPendingContainers(true);
+ //Kill/pause opportunistic containers if any to make room for
+ // promotion request
+ reclaimOpportunisticContainerResources(updateEvent.getContainer());
}
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
- Resources.subtractFrom(guaranteedResourcesDemanded,
- updateEvent.getContainer().getResource());
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
- startPendingContainers(false);
}
}
+ startPendingContainers(maxOppQueueLength <= 0);
}
}
@@ -259,7 +236,6 @@ public class ContainerScheduler extends AbstractService implements
|| rcs == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
} else if (execType == ExecutionType.OPPORTUNISTIC) {
queuedOpportunisticContainers
.put(container.getContainerId(), container);
@@ -270,7 +246,7 @@ public class ContainerScheduler extends AbstractService implements
}
} else if (rcs == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container);
- utilizationTracker.containerLaunched(container);
+ utilizationTracker.addContainerResources(container);
}
}
@@ -330,107 +306,65 @@ public class ContainerScheduler extends AbstractService implements
}
private void onResourcesReclaimed(Container container) {
- ContainerId containerId = container.getContainerId();
+ oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
// in which case, the container might still be queued.
- if (queuedOpportunisticContainers.remove(containerId) != null) {
- return;
- }
-
- // This could be killed externally for eg. by the ContainerManager,
- // in which case, the container might still be queued.
- if (queuedGuaranteedContainers.remove(containerId) != null) {
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
- return;
- }
-
- if (oppContainersToKill.remove(containerId) != null) {
- Resources.subtractFrom(
- opportunisticResourcesToBeReleased, container.getResource());
+ Container queued =
+ queuedOpportunisticContainers.remove(container.getContainerId());
+ if (queued == null) {
+ queuedGuaranteedContainers.remove(container.getContainerId());
}
// Requeue PAUSED containers
if (container.getContainerState() == ContainerState.PAUSED) {
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
- queuedGuaranteedContainers.put(containerId, container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
+ queuedGuaranteedContainers.put(container.getContainerId(), container);
} else {
- queuedOpportunisticContainers.put(containerId, container);
+ queuedOpportunisticContainers.put(
+ container.getContainerId(), container);
}
}
// decrement only if it was a running container
- Container completedContainer = runningContainers.remove(containerId);
+ Container completedContainer = runningContainers.remove(container
+ .getContainerId());
// only a running container releases resources upon completion
boolean resourceReleased = completedContainer != null;
if (resourceReleased) {
- this.utilizationTracker.containerReleased(container);
+ this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
-
- // In case of over-allocation being turned on, we may need to reclaim
- // more resources since the opportunistic containers that have been
- // killed or paused may have not released as much resource as we need.
- boolean reclaimOpportunisticResources = context.isOverAllocationEnabled();
- startPendingContainers(reclaimOpportunisticResources);
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
}
}
/**
* Start pending containers in the queue.
- * @param reclaimOpportunisticResources if set to true, resources allocated
- * to running OPPORTUNISTIC containers will be reclaimed in
- * cases where there are GUARANTEED containers being queued
+ * @param forceStartGuaranteedContaieners When this is true, start guaranteed
+ * container without looking at available resource
*/
- private void startPendingContainers(boolean reclaimOpportunisticResources) {
- // When opportunistic container not allowed (which is determined by
- // max-queue length of pending opportunistic containers <= 0), start
- // guaranteed containers without looking at available resources and
- // skip scanning the queue of opportunistic containers
- if (maxOppQueueLength <= 0) {
- forcefullyStartGuaranteedContainers();
- return;
- }
-
- Resource available = utilizationTracker.getAvailableResources();
-
- // Start guaranteed containers that are queued, if resources available.
- boolean allGuaranteedContainersLaunched =
- startGuaranteedContainers(available);
- // Start opportunistic containers, if resources available, which is true
- // if all guaranteed containers in queue have been launched.
- if (allGuaranteedContainersLaunched) {
- startOpportunisticContainers(available);
- } else {
- // If not all guaranteed containers in queue are launched, we may need
- // to reclaim resources from opportunistic containers that are running.
- if (reclaimOpportunisticResources) {
- reclaimOpportunisticContainerResources();
- }
+ private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
+ // Start guaranteed containers that are paused, if resources available.
+ boolean resourcesAvailable = startContainers(
+ queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+ // Start opportunistic containers, if resources available.
+ if (resourcesAvailable) {
+ startContainers(queuedOpportunisticContainers.values(), false);
}
}
- /**
- * Try to launch as many GUARANTEED containers as possible.
- * @param available the amount of resources available to launch containers
- * @return true if all queued GUARANTEED containers are launched
- * or there is no GUARANTEED containers to launch
- */
- private boolean startGuaranteedContainers(Resource available) {
- Iterator<Container> cIter =
- queuedGuaranteedContainers.values().iterator();
+ private boolean startContainers(
+ Collection<Container> containersToBeStarted, boolean force) {
+ Iterator<Container> cIter = containersToBeStarted.iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
- if (isResourceAvailable(available, container)) {
- startContainer(container);
- Resources.subtractFrom(available, container.getResource());
+ if (tryStartContainer(container, force)) {
cIter.remove();
- Resources.subtractFrom(
- guaranteedResourcesDemanded, container.getResource());
} else {
resourcesAvailable = false;
}
@@ -438,49 +372,25 @@ public class ContainerScheduler extends AbstractService implements
return resourcesAvailable;
}
- /**
- * Launch all queued GUARANTEED containers without checking resource
- * availability. This is an optimization in cases where OPPORTUNISTIC
- * containers are not allowed on the node.
- */
- private void forcefullyStartGuaranteedContainers() {
- Iterator<Container> cIter =
- queuedGuaranteedContainers.values().iterator();
- while (cIter.hasNext()) {
- Container container = cIter.next();
+ private boolean tryStartContainer(Container container, boolean force) {
+ boolean containerStarted = false;
+ // call startContainer without checking available resource when force==true
+ if (force || resourceAvailableToStartContainer(
+ container)) {
startContainer(container);
- cIter.remove();
- Resources.subtractFrom(
- guaranteedResourcesDemanded, container.getResource());
+ containerStarted = true;
}
+ return containerStarted;
}
+
/**
- * Try to launch as many OPPORTUNISTIC containers as possible.
- * @param available the amount of resources available to launch containers
- * @return true if all OPPORTUNISTIC containers are launched
- * or there is no OPPORTUNISTIC containers to launch
+ * Check if there is resource available to start a given container
+ * immediately. (This can be extended to include overallocated resources)
+ * @param container the container to start
+ * @return true if container can be launched directly
*/
- private boolean startOpportunisticContainers(Resource available) {
- Iterator<Container> cIter =
- queuedOpportunisticContainers.values().iterator();
- boolean resourcesAvailable = true;
- while (cIter.hasNext() && resourcesAvailable) {
- Container container = cIter.next();
- if (isResourceAvailable(available, container)) {
- startContainer(container);
- Resources.subtractFrom(available, container.getResource());
- cIter.remove();
- } else {
- resourcesAvailable = false;
- }
- }
- return resourcesAvailable;
- }
-
- private static boolean isResourceAvailable(
- Resource resource, Container container) {
- Resource left = Resources.subtract(resource, container.getResource());
- return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0;
+ private boolean resourceAvailableToStartContainer(Container container) {
+ return this.utilizationTracker.hasResourcesAvailable(container);
}
private boolean enqueueContainer(Container container) {
@@ -490,7 +400,6 @@ public class ContainerScheduler extends AbstractService implements
boolean isQueued;
if (isGuaranteedContainer) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
- Resources.addTo(guaranteedResourcesDemanded, container.getResource());
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
@@ -535,7 +444,18 @@ public class ContainerScheduler extends AbstractService implements
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
- startPendingContainers(true);
+
+ // When opportunistic container not allowed (which is determined by
+ // max-queue length of pending opportunistic containers <= 0), start
+ // guaranteed containers without looking at available resources.
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
+
+ // if the guaranteed container is queued, we need to preempt opportunistic
+ // containers for make room for it
+ if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
+ reclaimOpportunisticContainerResources(container);
+ }
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
@@ -553,19 +473,19 @@ public class ContainerScheduler extends AbstractService implements
}
@SuppressWarnings("unchecked")
- private void reclaimOpportunisticContainerResources() {
+ private void reclaimOpportunisticContainerResources(Container container) {
List<Container> extraOppContainersToReclaim =
- pickOpportunisticContainersToReclaimResources();
- killOpportunisticContainers(extraOppContainersToReclaim);
- }
-
- private void killOpportunisticContainers(
- Collection<Container> containersToReclaim) {
- for (Container contToReclaim : containersToReclaim) {
+ pickOpportunisticContainersToReclaimResources(
+ container.getContainerId());
+ // Kill the opportunistic containers that were chosen.
+ for (Container contToReclaim : extraOppContainersToReclaim) {
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
- "preempted";
- LOG.info("Container {} will be {} to start the execution of guaranteed" +
- " containers.", contToReclaim.getContainerId(), preemptionAction);
+ "resumed";
+ LOG.info(
+ "Container {} will be {} to start the "
+ + "execution of guaranteed container {}.",
+ contToReclaim.getContainerId(), preemptionAction,
+ container.getContainerId());
if (usePauseEventForPreemption) {
contToReclaim.sendPauseEvent(
@@ -576,15 +496,13 @@ public class ContainerScheduler extends AbstractService implements
"Container Killed to make room for Guaranteed Container.");
}
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
- Resources.addTo(
- opportunisticResourcesToBeReleased, contToReclaim.getResource());
}
}
private void startContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
runningContainers.put(container.getContainerId(), container);
- this.utilizationTracker.containerLaunched(container);
+ this.utilizationTracker.addContainerResources(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.startOpportunisticContainer(container.getResource());
@@ -592,12 +510,14 @@ public class ContainerScheduler extends AbstractService implements
container.sendLaunchEvent();
}
- private List<Container> pickOpportunisticContainersToReclaimResources() {
+ private List<Container> pickOpportunisticContainersToReclaimResources(
+ ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
List<Container> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
- ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+ containerToStartId);
// Go over the running opportunistic containers.
// Use a descending iterator to kill more recently started containers.
@@ -616,19 +536,15 @@ public class ContainerScheduler extends AbstractService implements
continue;
}
extraOpportContainersToKill.add(runningCont);
- // In the case of over-allocation, the running container may not
- // release as much resources as it has requested, but we'll check
- // again if more containers need to be killed/paused when this
- // container is released.
ContainersMonitor.decreaseResourceUtilization(
getContainersMonitor(), resourcesToFreeUp,
runningCont.getResource());
}
}
if (!hasSufficientResources(resourcesToFreeUp)) {
- LOG.warn("There are no sufficient resources to start guaranteed" +
- " containers at the moment. Opportunistic containers are in" +
- " the process of being killed to make room.");
+ LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
+ "at the moment. Opportunistic containers are in the process of" +
+ "being killed to make room.", containerToStartId);
}
return extraOpportContainersToKill;
}
@@ -643,42 +559,34 @@ public class ContainerScheduler extends AbstractService implements
* getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
}
- /**
- * Determine how much resources are needed to be freed up to launch the given
- * GUARANTEED container. Used to determine how many running OPPORTUNISTIC
- * containers need to be killed/paused, assuming OPPORTUNISTIC containers to
- * be killed/paused will release the amount of resources they have requested.
- *
- * If the node is over-allocating itself, this may cause not enough
- * OPPORTUNISTIC containers being killed/paused in cases where the running
- * OPPORTUNISTIC containers are not consuming fully their resource requests.
- * We'd check again upon container completion events to see if more running
- * OPPORTUNISTIC containers need to be killed/paused.
- *
- * @return the amount of resource needed to be reclaimed for this container
- */
- private ResourceUtilization resourcesToFreeUp() {
+ private ResourceUtilization resourcesToFreeUp(
+ ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
- .newInstance(0, 0, 0.0f);
-
- // Add to the allocation the allocation of pending guaranteed containers.
- ContainersMonitor.increaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, guaranteedResourcesDemanded);
+ .newInstance(this.utilizationTracker.getCurrentUtilization());
+
+ // Add to the allocation the allocation of the pending guaranteed
+ // containers that will start before the current container will be started.
+ for (Container container : queuedGuaranteedContainers.values()) {
+ ContainersMonitor.increaseResourceUtilization(
+ getContainersMonitor(), resourceAllocationToFreeUp,
+ container.getResource());
+ if (container.getContainerId().equals(containerToStartId)) {
+ break;
+ }
+ }
// These resources are being freed, likely at the behest of another
// guaranteed container..
- ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, opportunisticResourcesToBeReleased);
-
- // Deduct any remaining resources available
- Resource availableResources = utilizationTracker.getAvailableResources();
- if (availableResources.getVirtualCores() > 0 &&
- availableResources.getMemorySize() > 0) {
- ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
- resourceAllocationToFreeUp, availableResources);
+ for (Container container : oppContainersToKill.values()) {
+ ContainersMonitor.decreaseResourceUtilization(
+ getContainersMonitor(), resourceAllocationToFreeUp,
+ container.getResource());
}
+ // Subtract the overall node resources.
+ getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+ resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 9ad4f91..294eddf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -28,7 +28,5 @@ public enum ContainerSchedulerEventType {
// Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS,
CONTAINER_PAUSED,
- RECOVERY_COMPLETED,
- // Producer: Containers Monitor when over-allocation is on
- SCHEDULE_CONTAINERS
+ RECOVERY_COMPLETED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
deleted file mode 100644
index 58b73d2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * Keeps track of containers utilization over time and determines how much
- * resources are available to launch containers when over-allocation is on.
- */
-public abstract class NMAllocationPolicy {
- protected final ResourceThresholds overAllocationThresholds;
- protected final ContainersMonitor containersMonitor;
-
- public NMAllocationPolicy(
- ResourceThresholds overAllocationThresholds,
- ContainersMonitor containersMonitor) {
- this.containersMonitor = containersMonitor;
- this.overAllocationThresholds = overAllocationThresholds;
- }
-
- /**
- * Handle container launch events.
- * @param container the container that has been launched
- */
- public void containerLaunched(Container container) {
-
- }
-
- /**
- * Handle container release events.
- * @param container the container that has been released
- */
- public void containerReleased(Container container) {
-
- }
-
- /**
- * Get the amount of resources to launch containers when
- * over-allocation is turned on.
- * @return the amount of resources available to launch containers
- */
- public abstract Resource getAvailableResources();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 98d99c6..3c17eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -39,20 +38,22 @@ public interface ResourceUtilizationTracker {
ResourceUtilization getCurrentUtilization();
/**
- * Get the amount of resources currently available to launch containers.
- * @return Resource resources available to launch containers
+ * Add Container's resources to Node Utilization.
+ * @param container Container.
*/
- Resource getAvailableResources();
+ void addContainerResources(Container container);
/**
- * Add Container's resources to Node Utilization upon container launch.
+ * Subtract Container's resources to Node Utilization.
* @param container Container.
*/
- void containerLaunched(Container container);
+ void subtractContainerResource(Container container);
/**
- * Subtract Container's resources to Node Utilization upon container release.
+ * Check if NM has resources available currently to run the container.
* @param container Container.
+ * @return True, if NM has resources available currently to run the container.
*/
- void containerReleased(Container container);
+ boolean hasResourcesAvailable(Container container);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
deleted file mode 100644
index f486506..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * An implementation of NMAllocationPolicy based on the
- * snapshot of the latest containers utilization to determine how much
- * resources are available * to launch containers when over-allocation
- * is turned on.
- */
-public class SnapshotBasedOverAllocationPolicy
- extends NMAllocationPolicy {
-
- public SnapshotBasedOverAllocationPolicy(
- ResourceThresholds overAllocationThresholds,
- ContainersMonitor containersMonitor) {
- super(overAllocationThresholds, containersMonitor);
- }
-
- @Override
- public Resource getAvailableResources() {
- ResourceUtilization utilization =
- containersMonitor.getContainersUtilization(true).getUtilization();
- long memoryAvailable = Math.round(
- overAllocationThresholds.getMemoryThreshold() *
- containersMonitor.getPmemAllocatedForContainers()) -
- (utilization.getPhysicalMemory() << 20);
- int vcoreAvailable = Math.round(
- (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) *
- containersMonitor.getVCoresAllocatedForContainers());
- return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
deleted file mode 100644
index 6f9bc82..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-* An resource availability tracker that determines if there are resources
-* available based on if there are unallocated resources or if there are
-* un-utilized resources.
-*/
-public class UtilizationBasedResourceTracker
- extends AllocationBasedResourceTracker {
- private static final Logger LOG =
- LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
- private final NMAllocationPolicy overAllocationPolicy;
-
- UtilizationBasedResourceTracker(ContainerScheduler scheduler) {
- super(scheduler);
- this.overAllocationPolicy =
- getContainersMonitor().getContainerOverAllocationPolicy();
- }
-
- @Override
- public void containerLaunched(Container container) {
- super.containerLaunched(container);
- if (overAllocationPolicy != null) {
- overAllocationPolicy.containerLaunched(container);
- }
- }
-
- @Override
- public void containerReleased(Container container) {
- super.containerReleased(container);
- if (overAllocationPolicy != null) {
- overAllocationPolicy.containerReleased(container);
- }
- }
-
- @Override
- public Resource getAvailableResources() {
- Resource resourceBasedOnAllocation = getUnallocatedResources();
- Resource resourceBasedOnUtilization =
- getResourcesAvailableBasedOnUtilization();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The amount of resources available based on allocation is " +
- resourceBasedOnAllocation + ", based on utilization is " +
- resourceBasedOnUtilization);
- }
-
- return Resources.componentwiseMax(resourceBasedOnAllocation,
- resourceBasedOnUtilization);
- }
-
- /**
- * Get the amount of resources based on the slack between
- * the actual utilization and desired utilization.
- * @return Resource resource available
- */
- private Resource getResourcesAvailableBasedOnUtilization() {
- if (overAllocationPolicy == null) {
- return Resources.none();
- }
-
- return overAllocationPolicy.getAvailableResources();
- }
-
- @Override
- public ResourceUtilization getCurrentUtilization() {
- return getContainersMonitor().getContainersUtilization(false)
- .getUtilization();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 05e9dd0..93d0afb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
-import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -346,40 +345,6 @@ public abstract class BaseContainerManagerTest {
fStates.contains(containerStatus.getState()));
}
- public static void waitForContainerSubState(
- ContainerManagementProtocol containerManager, ContainerId containerID,
- ContainerSubState finalState)
- throws InterruptedException, YarnException, IOException {
- waitForContainerSubState(containerManager, containerID,
- Arrays.asList(finalState), 20);
- }
- public static void waitForContainerSubState(
- ContainerManagementProtocol containerManager, ContainerId containerID,
- List<ContainerSubState> finalStates, int timeOutMax)
- throws InterruptedException, YarnException, IOException {
- List<ContainerId> list = new ArrayList<>();
- list.add(containerID);
- GetContainerStatusesRequest request =
- GetContainerStatusesRequest.newInstance(list);
- ContainerStatus containerStatus;
- HashSet<ContainerSubState> fStates = new HashSet<>(finalStates);
- int timeoutSecs = 0;
- do {
- Thread.sleep(1000);
- containerStatus =
- containerManager.getContainerStatuses(request)
- .getContainerStatuses().get(0);
- LOG.info("Waiting for container to get into one of states " + fStates
- + ". Current state is " + containerStatus.getContainerSubState());
- timeoutSecs += 1;
- } while (!fStates.contains(containerStatus.getContainerSubState())
- && timeoutSecs < timeOutMax);
- LOG.info("Container state is " + containerStatus.getContainerSubState());
- Assert.assertTrue("ContainerSubState is not correct (timedout)",
- fStates.contains(containerStatus.getContainerSubState()));
- }
-
-
public static void waitForApplicationState(
ContainerManagerImpl containerManager, ApplicationId appID,
ApplicationState finalState)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 21d1889..d7d826c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -282,7 +282,7 @@ public class TestContainersMonitorResourceChange {
// will be 0.
assertEquals(
"Resource utilization must be default with MonitorThread's first run",
- 0, containersMonitor.getContainersUtilization(false).getUtilization()
+ 0, containersMonitor.getContainersUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
// Verify the container utilization value. Since atleast one round is done,
@@ -297,9 +297,8 @@ public class TestContainersMonitorResourceChange {
ContainersMonitorImpl containersMonitor, int timeoutMsecs)
throws InterruptedException {
int timeWaiting = 0;
- while (0 == containersMonitor.getContainersUtilization(false)
- .getUtilization().compareTo(
- ResourceUtilization.newInstance(0, 0, 0.0f))) {
+ while (0 == containersMonitor.getContainersUtilization()
+ .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) {
if (timeWaiting >= timeoutMsecs) {
break;
}
@@ -311,7 +310,7 @@ public class TestContainersMonitorResourceChange {
}
assertTrue("Resource utilization is not changed from second run onwards",
- 0 != containersMonitor.getContainersUtilization(false).getUtilization()
+ 0 != containersMonitor.getContainersUtilization()
.compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
deleted file mode 100644
index 1e8bfdf..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the {@link AllocationBasedResourceTracker} class.
- */
-public class TestAllocationBasedResourceTracker {
-
- private ContainerScheduler mockContainerScheduler;
-
- @Before
- public void setup() {
- mockContainerScheduler = mock(ContainerScheduler.class);
- ContainersMonitor containersMonitor =
- new ContainersMonitorImpl(mock(ContainerExecutor.class),
- mock(AsyncDispatcher.class), mock(Context.class));
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
- conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
- conf.setInt(YarnConfiguration.NM_VCORES, 8);
- containersMonitor.init(conf);
- when(mockContainerScheduler.getContainersMonitor())
- .thenReturn(containersMonitor);
- }
-
- /**
- * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
- * hasResourceAvailable should return false.
- */
- @Test
- public void testHasResourcesAvailable() {
- AllocationBasedResourceTracker tracker =
- new AllocationBasedResourceTracker(mockContainerScheduler);
- Container testContainer = mock(Container.class);
- when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(
- isResourcesAvailable(tracker.getAvailableResources(), testContainer));
- tracker.containerLaunched(testContainer);
- }
- Assert.assertFalse(
- isResourcesAvailable(tracker.getAvailableResources(), testContainer));
- }
-
- private static boolean isResourcesAvailable(
- Resource available, Container container) {
- return available.compareTo(container.getResource()) >= 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..82c2147
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link AllocationBasedResourceUtilizationTracker} class.
+ */
+public class TestAllocationBasedResourceUtilizationTracker {
+
+ private ContainerScheduler mockContainerScheduler;
+
+ @Before
+ public void setup() {
+ mockContainerScheduler = mock(ContainerScheduler.class);
+ ContainersMonitor containersMonitor =
+ new ContainersMonitorImpl(mock(ContainerExecutor.class),
+ mock(AsyncDispatcher.class), mock(Context.class));
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+ conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
+ conf.setInt(YarnConfiguration.NM_VCORES, 8);
+ containersMonitor.init(conf);
+ when(mockContainerScheduler.getContainersMonitor())
+ .thenReturn(containersMonitor);
+ }
+
+ /**
+ * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
+ * hasResourceAvailable should return false.
+ */
+ @Test
+ public void testHasResourcesAvailable() {
+ AllocationBasedResourceUtilizationTracker tracker =
+ new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+ Container testContainer = mock(Container.class);
+ when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
+ tracker.addContainerResources(testContainer);
+ }
+ Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
+ }
+
+ /**
+ * Test the case where the current allocation has been truncated to 0.8888891
+ * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
+ * true.
+ */
+ @Test
+ public void testHasEnoughCpu() {
+ AllocationBasedResourceUtilizationTracker tracker =
+ new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+ float currentAllocation = 0.8888891f;
+ long totalCores = 9;
+ int alreadyUsedCores = 8;
+ Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
+ (int) totalCores - alreadyUsedCores));
+ Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
+ (int) totalCores - alreadyUsedCores + 1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org