You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/07/19 22:35:21 UTC
[29/50] [abbrv] hadoop git commit: YARN-6706. Refactor
ContainerScheduler to make oversubscription change easier. (Haibo Chen via
asuresh)
YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b007921
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b007921
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b007921
Branch: refs/heads/HDFS-7240
Commit: 5b007921cdf01ecc8ed97c164b7d327b8304c529
Parents: ed27f2b
Author: Arun Suresh <as...@apache.org>
Authored: Mon Jul 17 14:07:23 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Mon Jul 17 14:11:14 2017 -0700
----------------------------------------------------------------------
.../scheduler/ContainerScheduler.java | 135 +++++++++++++------
.../TestContainerManagerRecovery.java | 2 +-
.../TestContainerSchedulerQueuing.java | 85 ++++++++++++
3 files changed, 177 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 24530b3..19243ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -192,7 +192,9 @@ public class ContainerScheduler extends AbstractService implements
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
- if (completedContainer != null) {
+ // only a running container releases resources upon completion
+ boolean resourceReleased = completedContainer != null;
+ if (resourceReleased) {
this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
@@ -218,8 +220,7 @@ public class ContainerScheduler extends AbstractService implements
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
- if (this.utilizationTracker.hasResourcesAvailable(container)) {
- startAllocatedContainer(container);
+ if (tryStartContainer(container)) {
cIter.remove();
} else {
resourcesAvailable = false;
@@ -228,50 +229,95 @@ public class ContainerScheduler extends AbstractService implements
return resourcesAvailable;
}
- @VisibleForTesting
- protected void scheduleContainer(Container container) {
- if (maxOppQueueLength <= 0) {
- startAllocatedContainer(container);
- return;
+ private boolean tryStartContainer(Container container) {
+ boolean containerStarted = false;
+ if (resourceAvailableToStartContainer(container)) {
+ startContainer(container);
+ containerStarted = true;
}
- if (queuedGuaranteedContainers.isEmpty() &&
- queuedOpportunisticContainers.isEmpty() &&
- this.utilizationTracker.hasResourcesAvailable(container)) {
- startAllocatedContainer(container);
+ return containerStarted;
+ }
+
+ /**
+ * Check if there is resource available to start a given container
+ * immediately. (This can be extended to include overallocated resources)
+ * @param container the container to start
+ * @return true if container can be launched directly
+ */
+ private boolean resourceAvailableToStartContainer(Container container) {
+ return this.utilizationTracker.hasResourcesAvailable(container);
+ }
+
+ private boolean enqueueContainer(Container container) {
+ boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
+ getExecutionType() == ExecutionType.GUARANTEED;
+
+ boolean isQueued;
+ if (isGuaranteedContainer) {
+ queuedGuaranteedContainers.put(container.getContainerId(), container);
+ isQueued = true;
} else {
- LOG.info("No available resources for container {} to start its execution "
- + "immediately.", container.getContainerId());
- boolean isQueued = true;
- if (container.getContainerTokenIdentifier().getExecutionType() ==
- ExecutionType.GUARANTEED) {
- queuedGuaranteedContainers.put(container.getContainerId(), container);
- // Kill running opportunistic containers to make space for
- // guaranteed container.
- killOpportunisticContainers(container);
+ if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
+ LOG.info("Opportunistic container {} will be queued at the NM.",
+ container.getContainerId());
+ queuedOpportunisticContainers.put(
+ container.getContainerId(), container);
+ isQueued = true;
} else {
- if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
- LOG.info("Opportunistic container {} will be queued at the NM.",
- container.getContainerId());
- queuedOpportunisticContainers.put(
- container.getContainerId(), container);
- } else {
- isQueued = false;
- LOG.info("Opportunistic container [{}] will not be queued at the NM" +
- "since max queue length [{}] has been reached",
- container.getContainerId(), maxOppQueueLength);
- container.sendKillEvent(
- ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
- "Opportunistic container queue is full.");
- }
+ LOG.info("Opportunistic container [{}] will not be queued at the NM" +
+ "since max queue length [{}] has been reached",
+ container.getContainerId(), maxOppQueueLength);
+ container.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Opportunistic container queue is full.");
+ isQueued = false;
}
- if (isQueued) {
- try {
- this.context.getNMStateStore().storeContainerQueued(
- container.getContainerId());
- } catch (IOException e) {
- LOG.warn("Could not store container [" + container.getContainerId()
- + "] state. The Container has been queued.", e);
- }
+ }
+
+ if (isQueued) {
+ try {
+ this.context.getNMStateStore().storeContainerQueued(
+ container.getContainerId());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + container.getContainerId()
+ + "] state. The Container has been queued.", e);
+ }
+ }
+
+ return isQueued;
+ }
+
+ @VisibleForTesting
+ protected void scheduleContainer(Container container) {
+ boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
+ getExecutionType() == ExecutionType.GUARANTEED;
+
+ // Given a guaranteed container, we enqueue it first and then try to start
+ // as many queuing guaranteed containers as possible followed by queuing
+ // opportunistic containers based on remaining resources available. If the
+ // container still stays in the queue afterwards, we need to preempt just
+ // enough number of opportunistic containers.
+ if (isGuaranteedContainer) {
+ enqueueContainer(container);
+ startPendingContainers();
+
+ // if the guaranteed container is queued, we need to preempt opportunistic
+ // containers for make room for it
+ if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
+ killOpportunisticContainers(container);
+ }
+ } else {
+ // Given an opportunistic container, we first try to start as many queuing
+ // guaranteed containers as possible followed by queuing opportunistic
+ // containers based on remaining resource available, then enqueue the
+ // opportunistic container. If the container is enqueued, we do another
+ // pass to try to start the newly enqueued opportunistic container.
+ startPendingContainers();
+ boolean containerQueued = enqueueContainer(container);
+ // container may not get queued because the max opportunistic container
+ // queue length is reached. If so, there is no point doing another pass
+ if (containerQueued) {
+ startPendingContainers();
}
}
}
@@ -292,7 +338,7 @@ public class ContainerScheduler extends AbstractService implements
}
}
- private void startAllocatedContainer(Container container) {
+ private void startContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
runningContainers.put(container.getContainerId(), container);
this.utilizationTracker.addContainerResources(container);
@@ -416,4 +462,5 @@ public class ContainerScheduler extends AbstractService implements
public ContainersMonitor getContainersMonitor() {
return this.context.getContainerManager().getContainersMonitor();
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 075d857..b1a7b4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -583,7 +583,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
@Override
public long getPmemAllocatedForContainers() {
- return 10240;
+ return (long) 2048 << 20;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b007921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 8264f2e..aeba399 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -332,6 +332,91 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
}
/**
+ * Starts one GUARANTEED container that takes us the whole node's resources.
+ * and submit more OPPORTUNISTIC containers than the opportunistic container
+ * queue can hold. OPPORTUNISTIC containers that cannot be queue should be
+ * killed.
+ * @throws Exception
+ */
+ @Test
+ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
+ containerManager.start();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ List<StartContainerRequest> list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.GUARANTEED)));
+
+ final int maxOppQueueLength = conf.getInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+ YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH);
+ for (int i = 1; i < maxOppQueueLength + 2; i++) {
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+ }
+
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.RUNNING, 40);
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(maxOppQueueLength + 1), ContainerState.DONE,
+ 40);
+ Thread.sleep(5000);
+
+ // Get container statuses. Container 0 should be running and container
+ // 1 to maxOppQueueLength should be queued and the last container should
+ // be killed
+ List<ContainerId> statList = new ArrayList<>();
+ for (int i = 0; i < maxOppQueueLength + 2; i++) {
+ statList.add(createContainerId(i));
+ }
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List<ContainerStatus> containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getContainerId().equals(createContainerId(0))) {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+ status.getState());
+ } else if (status.getContainerId().equals(createContainerId(
+ maxOppQueueLength + 1))) {
+ Assert.assertTrue(status.getDiagnostics().contains(
+ "Opportunistic container queue is full"));
+ } else {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+ status.getState());
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+
+ ContainerScheduler containerScheduler =
+ containerManager.getContainerScheduler();
+ Assert.assertEquals(maxOppQueueLength,
+ containerScheduler.getNumQueuedContainers());
+ Assert.assertEquals(0,
+ containerScheduler.getNumQueuedGuaranteedContainers());
+ Assert.assertEquals(maxOppQueueLength,
+ containerScheduler.getNumQueuedOpportunisticContainers());
+ }
+
+ /**
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
* requests by each container as such that only one can run in parallel.
* Thus, the OPPORTUNISTIC container that started running, will be
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org