You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:07:48 UTC
[26/50] [abbrv] hadoop git commit: YARN-4511. Common scheduler
changes to support scheduler-specific oversubscription implementations.
YARN-4511. Common scheduler changes to support scheduler-specific oversubscription implementations.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce4c4a70
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce4c4a70
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce4c4a70
Branch: refs/heads/YARN-1011
Commit: ce4c4a70839a66f80081ff5295266759b466a158
Parents: d7cdf2c
Author: Haibo Chen <ha...@apache.org>
Authored: Thu Nov 2 09:12:19 2017 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 21 14:56:07 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 6 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 6 +
.../resourcemanager/ResourceTrackerService.java | 3 +-
.../monitor/capacity/TempSchedulerNode.java | 2 +-
.../server/resourcemanager/rmnode/RMNode.java | 7 +
.../resourcemanager/rmnode/RMNodeImpl.java | 13 +-
.../scheduler/AbstractYarnScheduler.java | 4 +-
.../scheduler/ClusterNodeTracker.java | 6 +-
.../scheduler/SchedulerNode.java | 323 +++++++++++----
.../scheduler/SchedulerNodeReport.java | 4 +-
.../scheduler/capacity/CapacityScheduler.java | 2 +-
.../allocator/RegularContainerAllocator.java | 4 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 2 +-
.../common/fica/FiCaSchedulerNode.java | 11 +-
.../scheduler/fair/FSPreemptionThread.java | 2 +-
.../scheduler/fair/FSSchedulerNode.java | 9 +-
.../yarn/server/resourcemanager/MockNodes.java | 6 +
.../TestWorkPreservingRMRestart.java | 39 +-
...alCapacityPreemptionPolicyMockFramework.java | 2 +-
...alCapacityPreemptionPolicyMockFramework.java | 6 +-
.../scheduler/TestAbstractYarnScheduler.java | 6 +-
.../scheduler/TestSchedulerNode.java | 393 +++++++++++++++++++
.../capacity/TestCapacityScheduler.java | 2 +-
.../TestCapacitySchedulerAsyncScheduling.java | 10 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../TestNodeLabelContainerAllocation.java | 14 +-
.../fair/TestContinuousScheduling.java | 42 +-
.../scheduler/fair/TestFSSchedulerNode.java | 18 +-
.../scheduler/fair/TestFairScheduler.java | 14 +-
.../scheduler/fifo/TestFifoScheduler.java | 4 +-
30 files changed, 795 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 2eee351..350f4a3 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -203,6 +204,11 @@ public class NodeInfo {
}
@Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return null;
+ }
+
+ @Override
public long getUntrackedTimeStamp() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index c73fb15..bb6fb9d 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -190,6 +191,11 @@ public class RMNodeWrapper implements RMNode {
}
@Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return node.getOverAllocationInfo();
+ }
+
+ @Override
public long getUntrackedTimeStamp() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index b67172e..2edc050 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -397,7 +397,8 @@ public class ResourceTrackerService extends AbstractService implements
.getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
- resolve(host), capability, nodeManagerVersion, physicalResource);
+ resolve(host), capability, nodeManagerVersion, physicalResource,
+ request.getOverAllocationInfo());
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
index 320f262..25777af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
@@ -51,7 +51,7 @@ public class TempSchedulerNode {
public static TempSchedulerNode fromSchedulerNode(
FiCaSchedulerNode schedulerNode) {
TempSchedulerNode n = new TempSchedulerNode();
- n.totalResource = Resources.clone(schedulerNode.getTotalResource());
+ n.totalResource = Resources.clone(schedulerNode.getCapacity());
n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource());
n.runningContainers = schedulerNode.getCopiedListOfRunningContainers();
n.reservedContainer = schedulerNode.getReservedContainer();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index c77d29c..aa19483 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
/**
* Node managers information on available resources
@@ -117,6 +118,12 @@ public interface RMNode {
public ResourceUtilization getNodeUtilization();
/**
+ * Get the node overallocation threshold.
+ * @return the overallocation threshold
+ */
+ OverAllocationInfo getOverAllocationInfo();
+
+ /**
* the physical resources in the node.
* @return the physical resources in the node.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 65a0c20..b53b32b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@@ -112,6 +113,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final WriteLock writeLock;
private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+ private final OverAllocationInfo overallocationInfo;
private volatile boolean nextHeartBeat = true;
private final NodeId nodeId;
@@ -367,12 +369,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion) {
this(nodeId, context, hostName, cmPort, httpPort, node, capability,
- nodeManagerVersion, null);
+ nodeManagerVersion, null, null);
}
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
- String nodeManagerVersion, Resource physResource) {
+ String nodeManagerVersion, Resource physResource,
+ OverAllocationInfo overAllocationInfo) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
@@ -387,6 +390,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.physicalResource = physResource;
+ this.overallocationInfo = overAllocationInfo;
this.latestNodeHeartBeatResponse.setResponseId(0);
@@ -536,6 +540,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
+ @Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return this.overallocationInfo;
+ }
+
public void setNodeUtilization(ResourceUtilization nodeUtilization) {
this.writeLock.lock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 9d2b058..6b1fdcb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -357,7 +357,7 @@ public abstract class AbstractYarnScheduler
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
- node.containerStarted(containerId);
+ node.containerLaunched(containerId);
} finally {
readLock.unlock();
}
@@ -825,7 +825,7 @@ public abstract class AbstractYarnScheduler
writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
- Resource oldResource = node.getTotalResource();
+ Resource oldResource = node.getCapacity();
if (!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 8c7e447..90b1c3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -102,7 +102,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
nodesList.add(node);
// Update cluster capacity
- Resources.addTo(clusterCapacity, node.getTotalResource());
+ Resources.addTo(clusterCapacity, node.getCapacity());
staleClusterCapacity = Resources.clone(clusterCapacity);
// Update maximumAllocation
@@ -197,7 +197,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
// Update cluster capacity
- Resources.subtractFrom(clusterCapacity, node.getTotalResource());
+ Resources.subtractFrom(clusterCapacity, node.getCapacity());
staleClusterCapacity = Resources.clone(clusterCapacity);
// Update maximumAllocation
@@ -259,7 +259,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
private void updateMaxResources(SchedulerNode node, boolean add) {
- Resource totalResource = node.getTotalResource();
+ Resource totalResource = node.getCapacity();
ResourceInformation[] totalResources;
if (totalResource != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index b35aeba..8f5ae0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -60,24 +61,33 @@ public abstract class SchedulerNode {
private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+ private Resource capacity;
private Resource unallocatedResource = Resource.newInstance(0, 0);
- private Resource allocatedResource = Resource.newInstance(0, 0);
- private Resource totalResource;
+
private RMContainer reservedContainer;
- private volatile int numContainers;
private volatile ResourceUtilization containersUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
- /* set of containers that are allocated containers */
- private final Map<ContainerId, ContainerInfo> launchedContainers =
- new HashMap<>();
+ private final Map<ContainerId, ContainerInfo>
+ allocatedContainers = new HashMap<>();
+
+ private volatile int numGuaranteedContainers = 0;
+ private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0);
+
+ private volatile int numOpportunisticContainers = 0;
+ private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0);
private final RMNode rmNode;
private final String nodeName;
private final RMContext rmContext;
+ // The total amount of resources requested by containers that have been
+ // allocated but not yet launched on the node.
+ protected Resource resourceAllocatedPendingLaunch =
+ Resource.newInstance(0, 0);
+
private volatile Set<String> labels = null;
private volatile Set<NodeAttribute> nodeAttributes = null;
@@ -90,7 +100,7 @@ public abstract class SchedulerNode {
this.rmNode = node;
this.rmContext = node.getRMContext();
this.unallocatedResource = Resources.clone(node.getTotalCapability());
- this.totalResource = Resources.clone(node.getTotalCapability());
+ this.capacity = Resources.clone(node.getTotalCapability());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
@@ -113,9 +123,9 @@ public abstract class SchedulerNode {
* @param resource Total resources on the node.
*/
public synchronized void updateTotalResource(Resource resource){
- this.totalResource = resource;
- this.unallocatedResource = Resources.subtract(totalResource,
- this.allocatedResource);
+ this.capacity = Resources.clone(resource);
+ this.unallocatedResource = Resources.subtract(capacity,
+ this.allocatedResourceGuaranteed);
}
/**
@@ -174,17 +184,83 @@ public abstract class SchedulerNode {
protected synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer();
- if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
- deductUnallocatedResource(container.getResource());
- ++numContainers;
+
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ guaranteedContainerResourceAllocated(rmContainer, launchedOnNode);
+ } else {
+ opportunisticContainerResourceAllocated(rmContainer, launchedOnNode);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned container " + container.getId() + " of capacity "
+ + container.getResource() + " and type " +
+ container.getExecutionType() + " on host " + toString());
+ }
+ }
+
+ /**
+ * Handle an allocation of a GUARANTEED container.
+ * @param rmContainer the allocated GUARANTEED container
+ * @param launchedOnNode true if the container has been launched
+ */
+ private void guaranteedContainerResourceAllocated(
+ RMContainer rmContainer, boolean launchedOnNode) {
+ Container container = rmContainer.getContainer();
+
+ if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+ throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+ container.getExecutionType());
+ }
+
+ allocatedContainers.put(container.getId(),
+ new ContainerInfo(rmContainer, launchedOnNode));
+
+ Resource resource = container.getResource();
+ if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) {
+ Resources.subtractFrom(unallocatedResource, resource);
+ }
+
+ numGuaranteedContainers++;
+ }
+
+ /**
+ * Handle an allocation of a OPPORTUNISTIC container.
+ * @param rmContainer the allocated OPPORTUNISTIC container
+ * @param launchedOnNode true if the container has been launched
+ */
+ private void opportunisticContainerResourceAllocated(
+ RMContainer rmContainer, boolean launchedOnNode) {
+ Container container = rmContainer.getContainer();
+
+ if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+ throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+ container.getExecutionType());
}
- launchedContainers.put(container.getId(),
+ allocatedContainers.put(rmContainer.getContainerId(),
new ContainerInfo(rmContainer, launchedOnNode));
+ if (containerResourceAllocated(
+ container.getResource(), allocatedResourceOpportunistic)) {
+ // nothing to do here
+ }
+ numOpportunisticContainers++;
+ }
+
+ private boolean containerResourceAllocated(Resource allocated,
+ Resource aggregatedResources) {
+ if (allocated == null) {
+ LOG.error("Invalid deduction of null resource for "
+ + rmNode.getNodeAddress());
+ return false;
+ }
+ Resources.addTo(resourceAllocatedPendingLaunch, allocated);
+ Resources.addTo(aggregatedResources, allocated);
+ return true;
}
+
/**
- * Get unallocated resources on the node.
+ * Get resources that are not allocated to GUARANTEED containers on the node.
* @return Unallocated resources on the node
*/
public synchronized Resource getUnallocatedResource() {
@@ -192,42 +268,57 @@ public abstract class SchedulerNode {
}
/**
- * Get allocated resources on the node.
- * @return Allocated resources on the node
+ * Get resources allocated to GUARANTEED containers on the node.
+ * @return Allocated resources to GUARANTEED containers on the node
*/
public synchronized Resource getAllocatedResource() {
- return this.allocatedResource;
+ return this.allocatedResourceGuaranteed;
+ }
+
+ /**
+ * Get resources allocated to OPPORTUNISTIC containers on the node.
+ * @return Allocated resources to OPPORTUNISTIC containers on the node
+ */
+ public synchronized Resource getOpportunisticResourceAllocated() {
+ return this.allocatedResourceOpportunistic;
+ }
+
+ @VisibleForTesting
+ public synchronized Resource getResourceAllocatedPendingLaunch() {
+ return this.resourceAllocatedPendingLaunch;
}
/**
* Get total resources on the node.
* @return Total resources on the node.
*/
- public synchronized Resource getTotalResource() {
- return this.totalResource;
+ public synchronized Resource getCapacity() {
+ return this.capacity;
}
/**
- * Check if a container is launched by this node.
+ * Check if a GUARANTEED container is launched by this node.
* @return If the container is launched by the node.
*/
- public synchronized boolean isValidContainer(ContainerId containerId) {
- if (launchedContainers.containsKey(containerId)) {
- return true;
- }
- return false;
+ @VisibleForTesting
+ public synchronized boolean isValidGuaranteedContainer(
+ ContainerId containerId) {
+ ContainerInfo containerInfo = allocatedContainers.get(containerId);
+ return containerInfo != null && ExecutionType.GUARANTEED ==
+ containerInfo.container.getExecutionType();
}
/**
- * Update the resources of the node when releasing a container.
- * @param container Container to release.
+ * Check if an OPPORTUNISTIC container is launched by this node.
+ * @param containerId id of the container to check
+ * @return If the container is launched by the node.
*/
- protected synchronized void updateResourceForReleasedContainer(
- Container container) {
- if (container.getExecutionType() == ExecutionType.GUARANTEED) {
- addUnallocatedResource(container.getResource());
- --numContainers;
- }
+ @VisibleForTesting
+ public synchronized boolean isValidOpportunisticContainer(
+ ContainerId containerId) {
+ ContainerInfo containerInfo = allocatedContainers.get(containerId);
+ return containerInfo != null && ExecutionType.OPPORTUNISTIC ==
+ containerInfo.container.getExecutionType();
}
/**
@@ -237,17 +328,30 @@ public abstract class SchedulerNode {
*/
public synchronized void releaseContainer(ContainerId containerId,
boolean releasedByNode) {
- ContainerInfo info = launchedContainers.get(containerId);
- if (info == null) {
+ RMContainer rmContainer = getContainer(containerId);
+ if (rmContainer == null) {
+ LOG.warn("Invalid container " + containerId + " is released.");
+ return;
+ }
+
+ if (!allocatedContainers.containsKey(containerId)) {
+ // do not process if the container is never allocated on the node
return;
}
- if (!releasedByNode && info.launchedOnNode) {
- // wait until node reports container has completed
+
+ if (!releasedByNode &&
+ allocatedContainers.get(containerId).launchedOnNode) {
+ // only process if the container has not been launched on a node
+ // yet or it is released by node.
return;
}
- launchedContainers.remove(containerId);
- Container container = info.container.getContainer();
+ Container container = rmContainer.getContainer();
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ guaranteedContainerReleased(container);
+ } else {
+ opportunisticContainerReleased(container);
+ }
// We remove allocation tags when a container is actually
// released on NM. This is to avoid running into situation
@@ -260,14 +364,16 @@ public abstract class SchedulerNode {
container.getId(), container.getAllocationTags());
}
- updateResourceForReleasedContainer(container);
-
if (LOG.isDebugEnabled()) {
LOG.debug("Released container " + container.getId() + " of capacity "
- + container.getResource() + " on host " + rmNode.getNodeAddress()
- + ", which currently has " + numContainers + " containers, "
- + getAllocatedResource() + " used and " + getUnallocatedResource()
- + " available" + ", release resources=" + true);
+ + container.getResource() + " on host " + rmNode.getNodeAddress()
+ + ", with " + numGuaranteedContainers
+ + " guaranteed containers taking"
+ + getAllocatedResource() + " and " + numOpportunisticContainers
+ + " opportunistic containers taking "
+ + getOpportunisticResourceAllocated()
+ + " and " + getUnallocatedResource() + " (guaranteed) available"
+ + ", release resources=" + true);
}
}
@@ -275,42 +381,75 @@ public abstract class SchedulerNode {
* Inform the node that a container has launched.
* @param containerId ID of the launched container
*/
- public synchronized void containerStarted(ContainerId containerId) {
- ContainerInfo info = launchedContainers.get(containerId);
- if (info != null) {
+ public synchronized void containerLaunched(ContainerId containerId) {
+ ContainerInfo info = allocatedContainers.get(containerId);
+ if (info != null && !info.launchedOnNode) {
info.launchedOnNode = true;
+ Resources.subtractFrom(resourceAllocatedPendingLaunch,
+ info.container.getContainer().getResource());
}
}
/**
- * Add unallocated resources to the node. This is used when unallocating a
- * container.
- * @param resource Resources to add.
+ * Handle the release of a GUARANTEED container.
+ * @param container Container to release.
*/
- private synchronized void addUnallocatedResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
+ protected synchronized void guaranteedContainerReleased(
+ Container container) {
+ if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+ throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+ container.getExecutionType());
+ }
+
+ if (containerResourceReleased(container, allocatedResourceGuaranteed)) {
+ Resources.addTo(unallocatedResource, container.getResource());
}
- Resources.addTo(unallocatedResource, resource);
- Resources.subtractFrom(allocatedResource, resource);
+ // do not update allocated containers until the resources of
+ // the container are released because we need to check if we
+ // need to update resourceAllocatedPendingLaunch in case the
+ // container has not been launched on the node.
+ allocatedContainers.remove(container.getId());
+ numGuaranteedContainers--;
}
/**
- * Deduct unallocated resources from the node. This is used when allocating a
- * container.
- * @param resource Resources to deduct.
+ * Handle the release of an OPPORTUNISTIC container.
+ * @param container Container to release.
*/
- @VisibleForTesting
- public synchronized void deductUnallocatedResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
+ private void opportunisticContainerReleased(
+ Container container) {
+ if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+ throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+ container.getExecutionType());
+ }
+
+ if (containerResourceReleased(container, allocatedResourceOpportunistic)) {
+ // nothing to do here
+ }
+ // do not update allocated containers until the resources of
+ // the container are released because we need to check if we
+ // need to update resourceAllocatedPendingLaunch in case the
+ // container has not been launched on the node.
+ allocatedContainers.remove(container.getId());
+ numOpportunisticContainers--;
+ }
+
+ private boolean containerResourceReleased(Container container,
+ Resource aggregatedResource) {
+ Resource released = container.getResource();
+ if (released == null) {
+ LOG.error("Invalid resource addition of null resource for "
+ rmNode.getNodeAddress());
- return;
+ return false;
}
- Resources.subtractFrom(unallocatedResource, resource);
- Resources.addTo(allocatedResource, resource);
+ Resources.subtractFrom(aggregatedResource, released);
+
+ if (!allocatedContainers.get(container.getId()).launchedOnNode) {
+ // update resourceAllocatedPendingLaunch if the container is has
+ // not yet been launched on the node
+ Resources.subtractFrom(resourceAllocatedPendingLaunch, released);
+ }
+ return true;
}
/**
@@ -330,17 +469,28 @@ public abstract class SchedulerNode {
@Override
public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers="
- + getNumContainers() + " available=" + getUnallocatedResource()
- + " used=" + getAllocatedResource();
+ return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" +
+ getNumGuaranteedContainers() + " #opportunistic containers=" +
+ getNumOpportunisticContainers() + " available=" +
+ getUnallocatedResource() + " used by guaranteed containers=" +
+ allocatedResourceGuaranteed + " used by opportunistic containers=" +
+ allocatedResourceOpportunistic;
+ }
+
+ /**
+ * Get number of active GUARANTEED containers on the node.
+ * @return Number of active OPPORTUNISTIC containers on the node.
+ */
+ public int getNumGuaranteedContainers() {
+ return numGuaranteedContainers;
}
/**
- * Get number of active containers on the node.
- * @return Number of active containers on the node.
+ * Get number of active OPPORTUNISTIC containers on the node.
+ * @return Number of active OPPORTUNISTIC containers on the node.
*/
- public int getNumContainers() {
- return numContainers;
+ public int getNumOpportunisticContainers() {
+ return numOpportunisticContainers;
}
/**
@@ -348,8 +498,8 @@ public abstract class SchedulerNode {
* @return A copy of containers running on the node.
*/
public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
- List<RMContainer> result = new ArrayList<>(launchedContainers.size());
- for (ContainerInfo info : launchedContainers.values()) {
+ List<RMContainer> result = new ArrayList<>(allocatedContainers.size());
+ for (ContainerInfo info : allocatedContainers.values()) {
result.add(info.container);
}
return result;
@@ -359,12 +509,14 @@ public abstract class SchedulerNode {
* Get the containers running on the node with AM containers at the end.
* @return A copy of running containers with AM containers at the end.
*/
- public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
+ public synchronized List<RMContainer>
+ getRunningGuaranteedContainersWithAMsAtTheEnd() {
LinkedList<RMContainer> result = new LinkedList<>();
- for (ContainerInfo info : launchedContainers.values()) {
+ for (ContainerInfo info : allocatedContainers.values()) {
if(info.container.isAMContainer()) {
result.addLast(info.container);
- } else {
+ } else if (info.container.getExecutionType() ==
+ ExecutionType.GUARANTEED){
result.addFirst(info.container);
}
}
@@ -377,12 +529,9 @@ public abstract class SchedulerNode {
* @return The container for the specified container ID
*/
protected synchronized RMContainer getContainer(ContainerId containerId) {
- RMContainer container = null;
- ContainerInfo info = launchedContainers.get(containerId);
- if (info != null) {
- container = info.container;
- }
- return container;
+ ContainerInfo info = allocatedContainers.get(containerId);
+
+ return info != null ? info.container : null;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
index fa71a25..ea30d78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
@@ -31,11 +31,11 @@ public class SchedulerNodeReport {
private final Resource used;
private final Resource avail;
private final int num;
-
+
public SchedulerNodeReport(SchedulerNode node) {
this.used = node.getAllocatedResource();
this.avail = node.getUnallocatedResource();
- this.num = node.getNumContainers();
+ this.num = node.getNumGuaranteedContainers();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 4b274df..0b7115b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1986,7 +1986,7 @@ public class CapacityScheduler extends
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
- schedulerNode.getTotalResource());
+ schedulerNode.getCapacity());
}
// recover attributes from store if any.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index c0a11a0..5be2507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -514,13 +514,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
Resource capability = pendingAsk.getPerAllocationResource();
Resource available = node.getUnallocatedResource();
- Resource totalResource = node.getTotalResource();
+ Resource totalResource = node.getCapacity();
if (!Resources.lessThanOrEqual(rc, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for ask : " + pendingAsk
- + " node total capability : " + node.getTotalResource());
+ + " node total capability : " + node.getCapacity());
// Skip this locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 4bfdae9..e6a71f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -1072,7 +1072,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
diagnosticMessageBldr.append(" ( Partition : ");
diagnosticMessageBldr.append(node.getLabels());
diagnosticMessageBldr.append(", Total resource : ");
- diagnosticMessageBldr.append(node.getTotalResource());
+ diagnosticMessageBldr.append(node.getCapacity());
diagnosticMessageBldr.append(", Available resource : ");
diagnosticMessageBldr.append(node.getUnallocatedResource());
diagnosticMessageBldr.append(" ).");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 7277779..52165a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -144,9 +144,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
}
@Override
- protected synchronized void updateResourceForReleasedContainer(
+ protected synchronized void guaranteedContainerReleased(
Container container) {
- super.updateResourceForReleasedContainer(container);
+ super.guaranteedContainerReleased(container);
if (killableContainers.containsKey(container.getId())) {
Resources.subtractFrom(totalKillableResources, container.getResource());
killableContainers.remove(container.getId());
@@ -168,9 +168,10 @@ public class FiCaSchedulerNode extends SchedulerNode {
final Container container = rmContainer.getContainer();
LOG.info("Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + getRMNode().getNodeAddress()
- + ", which has " + getNumContainers() + " containers, "
- + getAllocatedResource() + " used and " + getUnallocatedResource()
- + " available after allocation");
+ + ", which has " + getNumGuaranteedContainers() + " guaranteed"
+ + " containers using " + getAllocatedResource() + ", "
+ + getNumOpportunisticContainers() + " opportunistic containers"
+ + " using " + getOpportunisticResourceAllocated());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index c32565f..dcb076f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -193,7 +193,7 @@ class FSPreemptionThread extends Thread {
// Figure out list of containers to consider
List<RMContainer> containersToCheck =
- node.getRunningContainersWithAMsAtTheEnd();
+ node.getRunningGuaranteedContainersWithAMsAtTheEnd();
containersToCheck.removeAll(node.getContainersForPreemption());
// Initialize potential with unallocated but not reserved resources
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 44ec9c3..95490f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -242,11 +242,12 @@ public class FSSchedulerNode extends SchedulerNode {
super.allocateContainer(rmContainer, launchedOnNode);
if (LOG.isDebugEnabled()) {
final Container container = rmContainer.getContainer();
- LOG.debug("Assigned container " + container.getId() + " of capacity "
+ LOG.info("Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + getRMNode().getNodeAddress()
- + ", which has " + getNumContainers() + " containers, "
- + getAllocatedResource() + " used and " + getUnallocatedResource()
- + " available after allocation");
+ + ", which has " + getNumGuaranteedContainers() + " guaranteed "
+ + "containers using " + getAllocatedResource() + ", "
+ + getNumOpportunisticContainers() + " opportunistic containers "
+ + "using " + getOpportunisticResourceAllocated());
}
Resource allocated = rmContainer.getAllocatedResource();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index a871993..ce1d6b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -263,6 +264,11 @@ public class MockNodes {
return this.nodeUtilization;
}
+ @Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return null;
+ }
+
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return OpportunisticContainersStatus.newInstance();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index a821b0a..df409f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -232,13 +232,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
Resource nmResource =
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
- assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
- assertTrue(schedulerNode1.isValidContainer(runningContainer
- .getContainerId()));
- assertFalse(schedulerNode1.isValidContainer(completedContainer
- .getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ amContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ runningContainer.getContainerId()));
+ assertFalse(schedulerNode1.isValidGuaranteedContainer(
+ completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
- assertEquals(2, schedulerNode1.getNumContainers());
+ assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getUnallocatedResource());
@@ -389,13 +390,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
Resource nmResource =
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
- assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
- assertTrue(
- schedulerNode1.isValidContainer(runningContainer.getContainerId()));
- assertFalse(
- schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ amContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ runningContainer.getContainerId()));
+ assertFalse(schedulerNode1.isValidGuaranteedContainer(
+ completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
- assertEquals(2, schedulerNode1.getNumContainers());
+ assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getUnallocatedResource());
@@ -1700,13 +1702,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
Resource nmResource = Resource.newInstance(nm1.getMemory(),
nm1.getvCores());
- assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
- assertTrue(
- schedulerNode1.isValidContainer(runningContainer.getContainerId()));
- assertFalse(
- schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ amContainer.getContainerId()));
+ assertTrue(schedulerNode1.isValidGuaranteedContainer(
+ runningContainer.getContainerId()));
+ assertFalse(schedulerNode1.isValidGuaranteedContainer(
+ completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
- assertEquals(2, schedulerNode1.getNumContainers());
+ assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getUnallocatedResource());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index fa66cbc..529b298 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -519,7 +519,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
totalRes = parseResourceFromString(resSring);
}
}
- when(sn.getTotalResource()).thenReturn(totalRes);
+ when(sn.getCapacity()).thenReturn(totalRes);
when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
// TODO, add settings of killable resources when necessary
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 964a230..89fb846 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -237,17 +237,17 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
// Check host resources
Assert.assertEquals(3, this.cs.getAllNodes().size());
SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
- Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+ Assert.assertEquals(100, node1.getCapacity().getMemorySize());
Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
Assert.assertNull(node1.getReservedContainer());
SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
- Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+ Assert.assertEquals(0, node2.getCapacity().getMemorySize());
Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
Assert.assertNotNull(node2.getReservedContainer());
SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
- Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+ Assert.assertEquals(30, node3.getCapacity().getMemorySize());
Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
Assert.assertNull(node3.getReservedContainer());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ba409b1..0b631a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -289,12 +289,12 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
SchedulerNode mockNode1 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource);
- when(mockNode1.getTotalResource()).thenReturn(fullResource1);
+ when(mockNode1.getCapacity()).thenReturn(fullResource1);
SchedulerNode mockNode2 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource);
- when(mockNode2.getTotalResource()).thenReturn(fullResource2);
+ when(mockNode2.getCapacity()).thenReturn(fullResource2);
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
@@ -482,7 +482,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
// mock container start
rm1.getRMContext().getScheduler()
- .getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
+ .getSchedulerNode(nm1.getNodeId()).containerLaunched(cid);
// verifies the allocation is made with correct number of tags
Map<String, Long> nodeTags = rm1.getRMContext()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
new file mode 100644
index 0000000..b04b277
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for SchedulerNode.
+ */
+public class TestSchedulerNode {
+ private final Resource nodeCapacity = Resource.newInstance(1024*10, 4);
+
+ @Test
+ public void testAllocateAndReleaseGuaranteedContainer() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+ Resource resource = Resource.newInstance(4096, 1);
+ RMContainer container = createRMContainer(0, resource,
+ ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+ ContainerId containerId = container.getContainerId();
+
+ // allocate a container on the node
+ schedulerNode.allocateContainer(container);
+
+ Assert.assertEquals("The container should have been allocated",
+ resource, schedulerNode.getAllocatedResource());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, resource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The container should have been allocated" +
+ " but not launched", resource,
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidGuaranteedContainer(containerId));
+
+ // launch the container on the node
+ schedulerNode.containerLaunched(containerId);
+
+ Assert.assertEquals("The container should have been launched",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release the container
+ schedulerNode.releaseContainer(containerId, true);
+ Assert.assertEquals("The container should have been released",
+ 0, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ }
+
+ @Test
+ public void testAllocateAndReleaseOpportunisticContainer() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+ Resource resource = Resource.newInstance(4096, 1);
+ RMContainer container = createRMContainer(0, resource,
+ ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+ ContainerId containerId = container.getContainerId();
+
+ // allocate a container on the node
+ schedulerNode.allocateContainer(container);
+
+ Assert.assertEquals("The container should have been allocated",
+ resource, schedulerNode.getOpportunisticResourceAllocated());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ nodeCapacity, schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The container should have been allocated" +
+ " but not launched", resource,
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumOpportunisticContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidOpportunisticContainer(containerId));
+
+ // launch the container on the node
+ schedulerNode.containerLaunched(containerId);
+
+ Assert.assertEquals("The container should have been launched",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release the container
+ schedulerNode.releaseContainer(containerId, true);
+ Assert.assertEquals("The container should have been released",
+ 0, schedulerNode.getNumOpportunisticContainers());
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+ Assert.assertFalse("The container should have been released",
+ schedulerNode.isValidOpportunisticContainer(containerId));
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainers() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+
+ Resource guaranteedResource = Resource.newInstance(4096, 1);
+ RMContainer guaranteedContainer =
+ createRMContainer(0, guaranteedResource,
+ ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+ ContainerId guaranteedContainerId = guaranteedContainer.getContainerId();
+
+ // allocate a guaranteed container on the node
+ schedulerNode.allocateContainer(guaranteedContainer);
+
+ Assert.assertEquals("The guaranteed container should have been allocated",
+ guaranteedResource, schedulerNode.getAllocatedResource());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, guaranteedResource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The guaranteed container should have been allocated" +
+ " but not launched", guaranteedResource,
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+
+ Resource opportunisticResource = Resource.newInstance(8192, 4);
+ RMContainer opportunisticContainer =
+ createRMContainer(1, opportunisticResource,
+ ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+ ContainerId opportunisticContainerId =
+ opportunisticContainer.getContainerId();
+
+ // allocate an opportunistic container on the node
+ schedulerNode.allocateContainer(opportunisticContainer);
+
+ Assert.assertEquals("The opportunistic container should have been" +
+ " allocated", opportunisticResource,
+ schedulerNode.getOpportunisticResourceAllocated());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, guaranteedResource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The opportunistic container should also have been" +
+ " allocated but not launched",
+ Resources.add(guaranteedResource, opportunisticResource),
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumOpportunisticContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+
+ // launch both containers on the node
+ schedulerNode.containerLaunched(guaranteedContainerId);
+ schedulerNode.containerLaunched(opportunisticContainerId);
+
+ Assert.assertEquals("Both containers should have been launched",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release both containers
+ schedulerNode.releaseContainer(guaranteedContainerId, true);
+ schedulerNode.releaseContainer(opportunisticContainerId, true);
+
+ Assert.assertEquals("The guaranteed container should have been released",
+ 0, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertEquals("The opportunistic container should have been released",
+ 0, schedulerNode.getNumOpportunisticContainers());
+ Assert.assertEquals("The guaranteed container should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ Assert.assertEquals("The opportunistic container should have been released",
+ Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+ Assert.assertFalse("The guaranteed container should have been released",
+ schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+ Assert.assertFalse("The opportunistic container should have been released",
+ schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+ }
+
+ @Test
+ public void testReleaseLaunchedContainerNotAsNode() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+ Resource resource = Resource.newInstance(4096, 1);
+ RMContainer container = createRMContainer(0, resource,
+ ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+ ContainerId containerId = container.getContainerId();
+
+ // allocate a container on the node
+ schedulerNode.allocateContainer(container);
+
+ Assert.assertEquals("The container should have been allocated",
+ resource, schedulerNode.getAllocatedResource());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, resource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The container should have been allocated" +
+ " but not launched", resource,
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidGuaranteedContainer(containerId));
+
+ // launch the container on the node
+ schedulerNode.containerLaunched(containerId);
+
+ Assert.assertEquals("The container should have been launched",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release the container
+ schedulerNode.releaseContainer(containerId, false);
+ Assert.assertEquals("The container should not have been released",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertEquals("The container should not have been released",
+ resource, schedulerNode.getAllocatedResource());
+ Assert.assertTrue("The container should not have been released",
+ schedulerNode.isValidGuaranteedContainer(containerId));
+ }
+
+ @Test
+ public void testReleaseUnlaunchedContainerAsNode() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+ Resource resource = Resource.newInstance(4096, 1);
+ RMContainer container = createRMContainer(0, resource,
+ ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+ ContainerId containerId = container.getContainerId();
+
+ // allocate a container on the node
+ schedulerNode.allocateContainer(container);
+
+ Assert.assertEquals("The container should have been allocated",
+ resource, schedulerNode.getAllocatedResource());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, resource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The container should have been allocated" +
+ " but not launched",
+ resource, schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidGuaranteedContainer(containerId));
+
+ // make sure the container is not launched yet
+ Assert.assertEquals("The container should not be launched already",
+ resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release the container
+ schedulerNode.releaseContainer(containerId, true);
+ Assert.assertEquals("The container should have been released",
+ 0, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ Assert.assertFalse("The container should have been released",
+ schedulerNode.isValidGuaranteedContainer(containerId));
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+ }
+
+ @Test
+ public void testReleaseUnlaunchedContainerNotAsNode() {
+ SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+ Resource resource = Resource.newInstance(4096, 1);
+ RMContainer container = createRMContainer(0, resource,
+ ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+ ContainerId containerId = container.getContainerId();
+
+ // allocate a container on the node
+ schedulerNode.allocateContainer(container);
+
+ Assert.assertEquals("The container should have been allocated",
+ resource, schedulerNode.getAllocatedResource());
+ Assert.assertEquals("Incorrect remaining resource accounted.",
+ Resources.subtract(nodeCapacity, resource),
+ schedulerNode.getUnallocatedResource());
+ Assert.assertEquals("The container should have been allocated" +
+ " but not launched", resource,
+ schedulerNode.getResourceAllocatedPendingLaunch());
+ Assert.assertEquals("The container should have been allocated",
+ 1, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertTrue(
+ schedulerNode.isValidGuaranteedContainer(containerId));
+
+ // make sure the container is not launched yet
+ Assert.assertEquals("The container should not have been launched",
+ resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+ // release the container
+ schedulerNode.releaseContainer(containerId, false);
+ Assert.assertEquals("The container should have been released",
+ 0, schedulerNode.getNumGuaranteedContainers());
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ Assert.assertFalse("The container should have been released",
+ schedulerNode.isValidGuaranteedContainer(containerId));
+ Assert.assertEquals("The container should have been released",
+ Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+ }
+
+ private SchedulerNode createSchedulerNode(Resource capacity) {
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+
+ RMNode rmNode = mock(RMNode.class);
+ when(rmNode.getNodeID()).thenReturn(nodeId);
+ when(rmNode.getHostName()).thenReturn(nodeId.getHost());
+ when(rmNode.getTotalCapability()).thenReturn(capacity);
+ when(rmNode.getRackName()).thenReturn("/default");
+ when(rmNode.getHttpAddress()).thenReturn(nodeId.getHost());
+ when(rmNode.getNodeAddress()).thenReturn(nodeId.getHost());
+
+ return new SchedulerNodeForTest(rmNode);
+ }
+
+ private static RMContainerImpl createRMContainer(long containerId,
+ Resource resource, ExecutionType executionType, RMNode node) {
+ Container container =
+ createContainer(containerId, resource, executionType, node);
+
+ Dispatcher dispatcher = new AsyncDispatcher();
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(dispatcher);
+ when(rmContext.getSystemMetricsPublisher()).
+ thenReturn(new NoOpSystemMetricPublisher());
+ when(rmContext.getYarnConfiguration()).
+ thenReturn(new YarnConfiguration());
+ when(rmContext.getContainerAllocationExpirer()).
+ thenReturn(new ContainerAllocationExpirer(dispatcher));
+ when(rmContext.getRMApplicationHistoryWriter()).
+ thenReturn(new RMApplicationHistoryWriter());
+
+ return new RMContainerImpl(container, null,
+ container.getId().getApplicationAttemptId(),
+ node.getNodeID(), "test", rmContext);
+ }
+
+ private static Container createContainer(long containerId, Resource resource,
+ ExecutionType executionType, RMNode node) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.
+ newInstance(ApplicationId.newInstance(0, 0), 0);
+ ContainerId cId =
+ ContainerId.newContainerId(appAttemptId, containerId);
+ Container container = Container.newInstance(
+ cId, node.getNodeID(), node.getNodeAddress(), resource,
+ Priority.newInstance(0), null, executionType);
+ return container;
+ }
+
+
+ /**
+ * A test implementation of SchedulerNode for the purpose of testing
+ * SchedulerNode only. Resource reservation is scheduler-dependent,
+ * and therefore not covered here.
+ */
+ private static final class SchedulerNodeForTest extends SchedulerNode {
+ SchedulerNodeForTest(RMNode node) {
+ super(node, false);
+ }
+
+ @Override
+ public void reserveResource(SchedulerApplicationAttempt attempt,
+ SchedulerRequestKey schedulerKey, RMContainer container) {
+ }
+
+ @Override
+ public void unreserveResource(SchedulerApplicationAttempt attempt) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e77d8e2..9e15856 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -4131,7 +4131,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
// Check total resource of scheduler node is also changed to 1 GB 1 core
Resource totalResource =
resourceManager.getResourceScheduler()
- .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
+ .getSchedulerNode(nm_0.getNodeId()).getCapacity();
Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
totalResource.getMemorySize());
Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 840d30d..9aeb946 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -229,8 +229,8 @@ public class TestCapacitySchedulerAsyncScheduling {
// nm1 runs 1 container(app1-container_01/AM)
// nm2 runs 1 container(app1-container_02)
- Assert.assertEquals(1, sn1.getNumContainers());
- Assert.assertEquals(1, sn2.getNumContainers());
+ Assert.assertEquals(1, sn1.getNumGuaranteedContainers());
+ Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
// kill app attempt1
scheduler.handle(
@@ -325,8 +325,8 @@ public class TestCapacitySchedulerAsyncScheduling {
// nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
// app2-container_01/AM)
// nm2 runs 1 container(app1-container_03)
- Assert.assertEquals(3, sn1.getNumContainers());
- Assert.assertEquals(1, sn2.getNumContainers());
+ Assert.assertEquals(3, sn1.getNumGuaranteedContainers());
+ Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
// reserve 1 container(app1-container_04) for app1 on nm1
ResourceRequest rr2 = ResourceRequest
@@ -639,7 +639,7 @@ public class TestCapacitySchedulerAsyncScheduling {
// nm1 runs 2 container(container_01/AM, container_02)
allocateAndLaunchContainers(am, nm1, rm, 1,
Resources.createResource(6 * GB), 0, 2);
- Assert.assertEquals(2, sn1.getNumContainers());
+ Assert.assertEquals(2, sn1.getNumGuaranteedContainers());
Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
// app asks 5 * 2G container
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce4c4a70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 04bb791..6215ce8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -385,7 +385,7 @@ public class TestLeafQueue {
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(
- (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB),
+ (int)(node_0.getCapacity().getMemorySize() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
}
@@ -684,7 +684,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(0*GB, a.getMetrics().getAllocatedMB());
- assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
+ assertEquals((int)(a.getCapacity() * node_0.getCapacity().getMemorySize()),
a.getMetrics().getAvailableMB());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org