You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/18 19:25:33 UTC
[33/46] hadoop git commit: Revert "CapacityScheduler: Improve
preemption to only kill containers that would satisfy the incoming request.
(Wangda Tan)"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
deleted file mode 100644
index 19148d7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-public class PreemptableQueue {
- // Partition -> killable resources and containers
- private Map<String, Resource> totalKillableResources = new HashMap<>();
- private Map<String, Map<ContainerId, RMContainer>> killableContainers =
- new HashMap<>();
- private PreemptableQueue parent;
-
- public PreemptableQueue(PreemptableQueue parent) {
- this.parent = parent;
- }
-
- public PreemptableQueue(Map<String, Resource> totalKillableResources,
- Map<String, Map<ContainerId, RMContainer>> killableContainers) {
- this.totalKillableResources = totalKillableResources;
- this.killableContainers = killableContainers;
- }
-
- void addKillableContainer(KillableContainer container) {
- String partition = container.getNodePartition();
- if (!totalKillableResources.containsKey(partition)) {
- totalKillableResources.put(partition, Resources.createResource(0));
- killableContainers.put(partition,
- new ConcurrentSkipListMap<ContainerId, RMContainer>());
- }
-
- RMContainer c = container.getRMContainer();
- Resources.addTo(totalKillableResources.get(partition),
- c.getAllocatedResource());
- killableContainers.get(partition).put(c.getContainerId(), c);
-
- if (null != parent) {
- parent.addKillableContainer(container);
- }
- }
-
- void removeKillableContainer(KillableContainer container) {
- String partition = container.getNodePartition();
- Map<ContainerId, RMContainer> partitionKillableContainers =
- killableContainers.get(partition);
- if (partitionKillableContainers != null) {
- RMContainer rmContainer = partitionKillableContainers.remove(
- container.getRMContainer().getContainerId());
- if (null != rmContainer) {
- Resources.subtractFrom(totalKillableResources.get(partition),
- rmContainer.getAllocatedResource());
- }
- }
-
- if (null != parent) {
- parent.removeKillableContainer(container);
- }
- }
-
- public Resource getKillableResource(String partition) {
- Resource res = totalKillableResources.get(partition);
- return res == null ? Resources.none() : res;
- }
-
- @SuppressWarnings("unchecked")
- public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
- Map<ContainerId, RMContainer> map = killableContainers.get(partition);
- return map == null ? Collections.EMPTY_MAP : map;
- }
-
- public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
- return killableContainers;
- }
-
- Map<String, Resource> getTotalKillableResources() {
- return totalKillableResources;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
deleted file mode 100644
index a9f02a5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class PreemptionManager {
- private ReentrantReadWriteLock.ReadLock readLock;
- private ReentrantReadWriteLock.WriteLock writeLock;
- private Map<String, PreemptableQueue> entities = new HashMap<>();
-
- public PreemptionManager() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- readLock = lock.readLock();
- writeLock = lock.writeLock();
- }
-
- public void refreshQueues(CSQueue parent, CSQueue current) {
- try {
- writeLock.lock();
- PreemptableQueue parentEntity = null;
- if (parent != null) {
- parentEntity = entities.get(parent.getQueueName());
- }
-
- if (!entities.containsKey(current.getQueueName())) {
- entities.put(current.getQueueName(),
- new PreemptableQueue(parentEntity));
- }
-
- if (current.getChildQueues() != null) {
- for (CSQueue child : current.getChildQueues()) {
- refreshQueues(current, child);
- }
- }
- }
- finally {
- writeLock.unlock();
- }
- }
-
- public void addKillableContainer(KillableContainer container) {
- try {
- writeLock.lock();
- PreemptableQueue entity = entities.get(container.getLeafQueueName());
- if (null != entity) {
- entity.addKillableContainer(container);
- }
- }
- finally {
- writeLock.unlock();
- }
- }
-
- public void removeKillableContainer(KillableContainer container) {
- try {
- writeLock.lock();
- PreemptableQueue entity = entities.get(container.getLeafQueueName());
- if (null != entity) {
- entity.removeKillableContainer(container);
- }
- }
- finally {
- writeLock.unlock();
- }
- }
-
- public void moveKillableContainer(KillableContainer oldContainer,
- KillableContainer newContainer) {
- // TODO, will be called when partition of the node changed OR
- // container moved to different queue
- }
-
- public void updateKillableContainerResource(KillableContainer container,
- Resource oldResource, Resource newResource) {
- // TODO, will be called when container's resource changed
- }
-
- @VisibleForTesting
- public Map<ContainerId, RMContainer> getKillableContainersMap(
- String queueName, String partition) {
- try {
- readLock.lock();
- PreemptableQueue entity = entities.get(queueName);
- if (entity != null) {
- Map<ContainerId, RMContainer> containers =
- entity.getKillableContainers().get(partition);
- if (containers != null) {
- return containers;
- }
- }
- return Collections.emptyMap();
- }
- finally {
- readLock.unlock();
- }
- }
-
- public Iterator<RMContainer> getKillableContainers(String queueName,
- String partition) {
- return getKillableContainersMap(queueName, partition).values().iterator();
- }
-
- public Resource getKillableResource(String queueName, String partition) {
- try {
- readLock.lock();
- PreemptableQueue entity = entities.get(queueName);
- if (entity != null) {
- Resource res = entity.getTotalKillableResources().get(partition);
- if (res == null || res.equals(Resources.none())) {
- return Resources.none();
- }
- return Resources.clone(res);
- }
- return Resources.none();
- }
- finally {
- readLock.unlock();
- }
- }
-
- public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
- try {
- readLock.lock();
- Map<String, PreemptableQueue> map = new HashMap<>();
- for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
- String key = entry.getKey();
- PreemptableQueue entity = entry.getValue();
- map.put(key, new PreemptableQueue(
- new HashMap<>(entity.getTotalKillableResources()),
- new HashMap<>(entity.getKillableContainers())));
- }
- return map;
- } finally {
- readLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index aad3bc7..5158255 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -120,9 +120,9 @@ public class AssignmentInformation {
}
private ContainerId getFirstContainerIdFromOperation(Operation op) {
- if (null != operationDetails.get(op)) {
+ if (null != operationDetails.get(Operation.ALLOCATION)) {
List<AssignmentDetails> assignDetails =
- operationDetails.get(op);
+ operationDetails.get(Operation.ALLOCATION);
if (!assignDetails.isEmpty()) {
return assignDetails.get(0).containerId;
}
@@ -131,7 +131,7 @@ public class AssignmentInformation {
}
public ContainerId getFirstAllocatedOrReservedContainerId() {
- ContainerId containerId;
+ ContainerId containerId = null;
containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
if (null != containerId) {
return containerId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f474aad..4d563cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -95,7 +94,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* to hold the message if its app doesn't not get container from a node
*/
private String appSkipNodeDiagnostics;
- private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -140,30 +138,28 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
containerAllocator = new ContainerAllocator(this, rc, rmContext);
-
- if (scheduler instanceof CapacityScheduler) {
- capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
- }
}
- public synchronized boolean containerCompleted(RMContainer rmContainer,
+ synchronized public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
- ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
- if (null == liveContainers.remove(containerId)) {
+ if (null == liveContainers.remove(rmContainer.getContainerId())) {
return false;
}
-
+
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
+ Container container = rmContainer.getContainer();
+ ContainerId containerId = container.getId();
+
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
- containersToPreempt.remove(containerId);
+ containersToPreempt.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@@ -180,7 +176,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return true;
}
- public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
@@ -204,9 +200,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
-
- ContainerId containerId = container.getId();
- liveContainers.put(containerId, rmContainer);
+ liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
@@ -219,17 +213,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Inform the container
rmContainer.handle(
- new RMContainerEvent(containerId, RMContainerEventType.START));
+ new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
- + containerId.getApplicationAttemptId()
- + " container=" + containerId + " host="
+ + container.getId().getApplicationAttemptId()
+ + " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
- getApplicationId(), containerId);
+ getApplicationId(), container.getId());
return rmContainer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 1d0e78a..fe6db47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -18,29 +18,22 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
+
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
- private Map<ContainerId, RMContainer> killableContainers = new HashMap<>();
- private Resource totalKillableResources = Resource.newInstance(0, 0);
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> nodeLabels) {
@@ -99,6 +92,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
@Override
public synchronized void unreserveResource(
SchedulerApplicationAttempt application) {
+
// adding NP checks as this can now be called for preemption
if (getReservedContainer() != null
&& getReservedContainer().getContainer() != null
@@ -121,55 +115,4 @@ public class FiCaSchedulerNode extends SchedulerNode {
}
setReservedContainer(null);
}
-
- // According to decisions from preemption policy, mark the container to killable
- public synchronized void markContainerToKillable(ContainerId containerId) {
- RMContainer c = launchedContainers.get(containerId);
- if (c != null && !killableContainers.containsKey(containerId)) {
- killableContainers.put(containerId, c);
- Resources.addTo(totalKillableResources, c.getAllocatedResource());
- }
- }
-
- // According to decisions from preemption policy, mark the container to
- // non-killable
- public synchronized void markContainerToNonKillable(ContainerId containerId) {
- RMContainer c = launchedContainers.get(containerId);
- if (c != null && killableContainers.containsKey(containerId)) {
- killableContainers.remove(containerId);
- Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());
- }
- }
-
- @Override
- protected synchronized void updateResource(
- Container container) {
- super.updateResource(container);
- if (killableContainers.containsKey(container.getId())) {
- Resources.subtractFrom(totalKillableResources, container.getResource());
- killableContainers.remove(container.getId());
- }
- }
-
- @Override
- protected synchronized void changeContainerResource(ContainerId containerId,
- Resource deltaResource, boolean increase) {
- super.changeContainerResource(containerId, deltaResource, increase);
-
- if (killableContainers.containsKey(containerId)) {
- if (increase) {
- Resources.addTo(totalKillableResources, deltaResource);
- } else {
- Resources.subtractFrom(totalKillableResources, deltaResource);
- }
- }
- }
-
- public synchronized Resource getTotalKillableResources() {
- return totalKillableResources;
- }
-
- public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
- return killableContainers;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 35b7c14..9cf09e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -38,15 +38,10 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED,
- /* Source: SchedulingEditPolicy */
+ // Source: SchedulingEditPolicy
KILL_RESERVED_CONTAINER,
-
- // Mark a container for preemption
- MARK_CONTAINER_FOR_PREEMPTION,
-
- // Mark a for-preemption container killable
- MARK_CONTAINER_FOR_KILLABLE,
-
- // Cancel a killable container
- MARK_CONTAINER_FOR_NONKILLABLE
+ MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
+ // in the near future
+ KILL_PREEMPTED_CONTAINER // Kill a container previously marked for
+ // preemption
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index c944752..d9306dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -59,7 +59,7 @@ public class TestRMDispatcher {
rmDispatcher.getEventHandler().handle(event1);
ContainerPreemptEvent event2 =
new ContainerPreemptEvent(appAttemptId, container,
- SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE);
+ SchedulerEventType.KILL_PREEMPTED_CONTAINER);
rmDispatcher.getEventHandler().handle(event2);
ContainerPreemptEvent event3 =
new ContainerPreemptEvent(appAttemptId, container,
@@ -70,7 +70,7 @@ public class TestRMDispatcher {
verify(sched, times(3)).handle(any(SchedulerEvent.class));
verify(sched).killReservedContainer(container);
verify(sched).markContainerForPreemption(appAttemptId, container);
- verify(sched).markContainerForKillable(container);
+ verify(sched).killPreemptedContainer(container);
} catch (InterruptedException e) {
Assert.fail();
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 3057615..028afb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -2352,7 +2352,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app0.getApplicationId()).getCurrentAppAttempt();
// kill app0-attempt
- cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
+ cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
app0.getCurrentAppAttempt().getMasterContainer().getId()));
am0.waitForState(RMAppAttemptState.FAILED);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 16f3f60..5035afe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -565,7 +566,7 @@ public class TestAMRestart {
ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt;
- scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+ scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@@ -581,7 +582,7 @@ public class TestAMRestart {
// Preempt the second attempt.
ContainerId amContainer2 =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
- scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
+ scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@@ -676,7 +677,7 @@ public class TestAMRestart {
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Forcibly preempt the am container;
- scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+ scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index e9129de..13f267d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -168,7 +167,6 @@ public class TestProportionalCapacityPreemptionPolicy {
when(mCS.getConfiguration()).thenReturn(schedConf);
rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext);
- when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
Dispatcher disp = mock(Dispatcher.class);
@@ -291,7 +289,7 @@ public class TestProportionalCapacityPreemptionPolicy {
List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId());
- assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
+ assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 21ea495..512f37c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -124,7 +123,6 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
mClock = mock(Clock.class);
cs = mock(CapacityScheduler.class);
when(cs.getResourceCalculator()).thenReturn(rc);
- when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
nlm = mock(RMNodeLabelsManager.class);
mDisp = mock(EventHandler.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 171196f..0b32676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -265,7 +264,6 @@ public class TestApplicationLimits {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index d8161f8..1569a12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -205,7 +205,7 @@ public class TestApplicationPriority {
if (++counter > 2) {
break;
}
- cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check node report, 12 GB used and 4 GB available
@@ -512,7 +512,7 @@ public class TestApplicationPriority {
if (++counter > 2) {
break;
}
- cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+ cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
@@ -542,7 +542,7 @@ public class TestApplicationPriority {
if (++counter > 1) {
break;
}
- cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+ cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 16ba607..b6c005b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -1188,7 +1188,7 @@ public class TestCapacityScheduler {
// kill the 3 containers
for (Container c : allocatedContainers) {
- cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@@ -1197,7 +1197,7 @@ public class TestCapacityScheduler {
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container
- cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
+ cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed
@@ -1220,7 +1220,7 @@ public class TestCapacityScheduler {
allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) {
- cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@@ -1269,7 +1269,7 @@ public class TestCapacityScheduler {
}
// Call killContainer to preempt the container
- cs.markContainerForKillable(rmContainer);
+ cs.killPreemptedContainer(rmContainer);
Assert.assertEquals(3, requests.size());
for (ResourceRequest request : requests) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index bea7797..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,677 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCapacitySchedulerPreemption {
- private static final Log LOG = LogFactory.getLog(
- TestCapacitySchedulerPreemption.class);
-
- private final int GB = 1024;
-
- private Configuration conf;
-
- RMNodeLabelsManager mgr;
-
- Clock clock;
-
- @Before
- public void setUp() throws Exception {
- conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
- conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
- ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
- conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
- // Set preemption related configurations
- conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
- 0);
- conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
- true);
- conf.setFloat(
- ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
- conf.setFloat(
- ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
- mgr = new NullRMNodeLabelsManager();
- mgr.init(this.conf);
- clock = mock(Clock.class);
- when(clock.getTime()).thenReturn(0L);
- }
-
- private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
- RMActiveServices activeServices = rm.getRMActiveService();
- SchedulingMonitor mon = null;
- for (Service service : activeServices.getServices()) {
- if (service instanceof SchedulingMonitor) {
- mon = (SchedulingMonitor) service;
- break;
- }
- }
-
- if (mon != null) {
- return mon.getSchedulingEditPolicy();
- }
- return null;
- }
-
- @Test (timeout = 60000)
- public void testSimplePreemption() throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
-
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getUnallocatedResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(1, killableContainers.size());
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 6 containers, and app2 has 2 containers
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersNodeLocalityDelay()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with locality specified, so it needs
- * to wait for missed-opportunity before get scheduled.
- * Check if system waits missed-opportunity before finish killable container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getUnallocatedResource().getMemory());
-
- // AM asks for a 1 * GB container with unknown host and unknown rack
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "unknownhost",
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, one container will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 6 containers, and app2 has 2 containers (new container allocated)
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersHardNodeLocality()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with hard locality specified, and
- * asked host is not existed
- * Confirm system doesn't preempt any container.
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getUnallocatedResource().getMemory());
-
- // AM asks for a 1 * GB container for h3 with hard locality,
- // h3 doesn't exist in the cluster
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1, true), ResourceRequest
- .newInstance(Priority.newInstance(1), "h3",
- Resources.createResource(1 * GB), 1, false), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1, false)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, nothing will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 7 containers, and app2 has 1 containers (no container allocated)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Killable containers resources will be excluded from PCPP (no duplicated
- * container added to killable list)
- * 2) When more resources need to be preempted, new containers will be selected
- * and killable containers will be considered
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
- // Check killable containers and to-be-preempted containers in edit policy
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Run edit schedule again, confirm status doesn't changed
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Save current to kill containers
- Set<ContainerId> previousKillableContainers = new HashSet<>(
- pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
- .keySet());
-
- // Update request resource of c from 1 to 2, so we need to preempt
- // one more container
- am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
- // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
- // and 1 container in killable map
- editPolicy.editSchedule();
- Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
- // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Check if previous killable containers included by new killable containers
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- Assert.assertTrue(
- Sets.difference(previousKillableContainers, killableContainers.keySet())
- .isEmpty());
- }
-
- @Test (timeout = 60000)
- public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Containers will be marked to killable
- * 2) Cancel resource request
- * 3) Killable containers will be cancelled from policy and scheduler
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if 3 container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
- // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
- am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
- // Call editSchedule once more to make sure still nothing happens
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersUserLimit()
- throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
- csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
- MockRM rm1 = new MockRM(csConf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getUnallocatedResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getUnallocatedResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if no container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- // No preemption happens
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
- Assert.assertEquals(0, killableContainers.size());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- private Map<ContainerId, RMContainer> waitKillableContainersSize(
- PreemptionManager pm, String queueName, String partition,
- int expectedSize) throws InterruptedException {
- Map<ContainerId, RMContainer> killableContainers =
- pm.getKillableContainersMap(queueName, partition);
-
- int wait = 0;
- // Wait for at most 5 sec (it should be super fast actually)
- while (expectedSize != killableContainers.size() && wait < 500) {
- killableContainers = pm.getKillableContainersMap(queueName, partition);
- Thread.sleep(10);
- wait++;
- }
-
- Assert.assertEquals(expectedSize, killableContainers.size());
- return killableContainers;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 1612201..5169337 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -100,7 +99,6 @@ public class TestChildQueueOrder {
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 87a3d51..69b0813 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -151,7 +150,6 @@ public class TestLeafQueue {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
@@ -3094,7 +3092,6 @@ public class TestLeafQueue {
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(2 * GB, 2));
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
return csContext;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 1ee201d..bbf6e43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -1677,100 +1676,4 @@ public class TestNodeLabelContainerAllocation {
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
}
-
- @Test
- public void testParentQueueMaxCapsAreRespected() throws Exception {
- /*
- * Queue tree:
- * Root
- * / \
- * A B
- * / \
- * A1 A2
- *
- * A has 50% capacity and 50% max capacity (of label=x)
- * A1/A2 has 50% capacity and 100% max capacity (of label=x)
- * Cluster has one node (label=x) with resource = 24G.
- * So we can at most use 12G resources under queueA.
- */
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration(this.conf);
-
- // Define top-level queues
- csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
- "b"});
- csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-
- final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- csConf.setCapacity(A, 10);
- csConf.setAccessibleNodeLabels(A, toSet("x"));
- csConf.setCapacityByLabel(A, "x", 50);
- csConf.setMaximumCapacityByLabel(A, "x", 50);
-
- final String B = CapacitySchedulerConfiguration.ROOT + ".b";
- csConf.setCapacity(B, 90);
- csConf.setAccessibleNodeLabels(B, toSet("x"));
- csConf.setCapacityByLabel(B, "x", 50);
- csConf.setMaximumCapacityByLabel(B, "x", 50);
-
- // Define 2nd-level queues
- csConf.setQueues(A, new String[] { "a1",
- "a2"});
-
- final String A1 = A + ".a1";
- csConf.setCapacity(A1, 50);
- csConf.setAccessibleNodeLabels(A1, toSet("x"));
- csConf.setCapacityByLabel(A1, "x", 50);
- csConf.setMaximumCapacityByLabel(A1, "x", 100);
- csConf.setUserLimitFactor(A1, 100.0f);
-
- final String A2 = A + ".a2";
- csConf.setCapacity(A2, 50);
- csConf.setAccessibleNodeLabels(A2, toSet("x"));
- csConf.setCapacityByLabel(A2, "x", 50);
- csConf.setMaximumCapacityByLabel(A2, "x", 100);
- csConf.setUserLimitFactor(A2, 100.0f);
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of(
- NodeLabel.newInstance("x", false)));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
-
- // inject node label manager
- MockRM rm = new MockRM(csConf) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm.getRMContext().setNodeLabelManager(mgr);
- rm.start();
-
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
- MockNM nm1 =
- new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
- nm1.registerNode();
-
- // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
- RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
- am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
- doNMHeartbeat(rm, nm1.getNodeId(), 10);
- checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
- cs.getApplicationAttempt(am1.getApplicationAttemptId()));
-
- // Try to launch app2 in a2, asked 2GB, should success
- RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
-
- // am2 asks more resources, cannot success because current used = 9G (app1)
- // + 2G (app2) = 11G, and queue's max capacity = 12G
- am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
-
- doNMHeartbeat(rm, nm1.getNodeId(), 10);
- checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
- cs.getApplicationAttempt(am2.getApplicationAttemptId()));
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 23dc860..f73baa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -93,7 +92,6 @@ public class TestParentQueue {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 56facee..2ef5e39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -127,7 +126,6 @@ public class TestReservations {
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
- when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf);