You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/16 19:56:40 UTC
[21/23] hadoop git commit: YARN-1651. CapacityScheduler side changes
to support container resize. Contributed by Wangda Tan
YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/733b0f68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/733b0f68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/733b0f68
Branch: refs/heads/YARN-1197
Commit: 733b0f68061558dc32eddb1f112447f5f4be02d0
Parents: b7c4cd5
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 15 10:21:39 2015 +0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Sep 16 10:55:49 2015 -0700
----------------------------------------------------------------------
.../v2/app/rm/TestRMContainerAllocator.java | 11 +-
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 14 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 13 +
.../sls/scheduler/ResourceSchedulerWrapper.java | 21 +-
.../sls/scheduler/SLSCapacityScheduler.java | 19 +-
hadoop-yarn-project/CHANGES.txt | 3 +
.../api/impl/TestAMRMClientOnRMRestart.java | 8 +-
.../resource/DefaultResourceCalculator.java | 5 +
.../resource/DominantResourceCalculator.java | 6 +
.../yarn/util/resource/ResourceCalculator.java | 5 +
.../hadoop/yarn/util/resource/Resources.java | 5 +
.../util/resource/TestResourceCalculator.java | 30 +-
.../protocolrecords/NodeHeartbeatResponse.java | 5 +-
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 5 +-
.../ApplicationMasterService.java | 22 +-
.../server/resourcemanager/RMAuditLogger.java | 2 +
.../server/resourcemanager/RMServerUtils.java | 164 ++++
.../resourcemanager/ResourceTrackerService.java | 7 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 4 +-
.../rmcontainer/RMContainer.java | 4 +
.../RMContainerChangeResourceEvent.java | 44 +
.../rmcontainer/RMContainerEventType.java | 13 +-
.../rmcontainer/RMContainerImpl.java | 121 ++-
.../RMContainerUpdatesAcquiredEvent.java | 35 +
.../server/resourcemanager/rmnode/RMNode.java | 9 +
.../rmnode/RMNodeDecreaseContainerEvent.java | 39 +
.../resourcemanager/rmnode/RMNodeEventType.java | 1 +
.../resourcemanager/rmnode/RMNodeImpl.java | 93 ++
.../rmnode/RMNodeStatusEvent.java | 32 +-
.../scheduler/AbstractYarnScheduler.java | 150 ++-
.../resourcemanager/scheduler/Allocation.java | 22 +-
.../scheduler/AppSchedulingInfo.java | 249 ++++-
.../resourcemanager/scheduler/QueueMetrics.java | 16 +-
.../scheduler/SchedContainerChangeRequest.java | 118 +++
.../scheduler/SchedulerApplication.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 253 +++--
.../scheduler/SchedulerNode.java | 31 +
.../scheduler/SchedulerUtils.java | 11 +-
.../scheduler/YarnScheduler.java | 14 +-
.../scheduler/capacity/AbstractCSQueue.java | 23 +-
.../scheduler/capacity/CSAssignment.java | 9 +
.../scheduler/capacity/CSQueue.java | 16 +
.../scheduler/capacity/CapacityScheduler.java | 83 +-
.../scheduler/capacity/LeafQueue.java | 127 ++-
.../scheduler/capacity/ParentQueue.java | 115 ++-
.../allocator/AbstractContainerAllocator.java | 131 +++
.../capacity/allocator/ContainerAllocator.java | 149 +--
.../allocator/IncreaseContainerAllocator.java | 365 +++++++
.../allocator/RegularContainerAllocator.java | 30 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 68 +-
.../scheduler/fair/FairScheduler.java | 35 +-
.../scheduler/fifo/FifoScheduler.java | 25 +-
.../server/resourcemanager/Application.java | 2 +-
.../yarn/server/resourcemanager/MockAM.java | 9 +
.../yarn/server/resourcemanager/MockNodes.java | 13 +
.../yarn/server/resourcemanager/MockRM.java | 13 +
.../TestApplicationMasterService.java | 144 ++-
.../applicationsmanager/TestAMRestart.java | 15 +-
.../TestRMAppLogAggregationStatus.java | 10 +-
.../attempt/TestRMAppAttemptTransitions.java | 32 +-
.../rmcontainer/TestRMContainerImpl.java | 117 ++-
.../capacity/TestCapacityScheduler.java | 128 ++-
.../scheduler/capacity/TestChildQueueOrder.java | 4 +-
.../capacity/TestContainerAllocation.java | 50 +-
.../capacity/TestContainerResizing.java | 963 +++++++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../scheduler/capacity/TestParentQueue.java | 4 +-
.../scheduler/capacity/TestReservations.java | 9 +-
.../scheduler/fair/FairSchedulerTestBase.java | 6 +-
.../fair/TestContinuousScheduling.java | 2 +-
.../scheduler/fair/TestFairScheduler.java | 30 +-
.../scheduler/fifo/TestFifoScheduler.java | 28 +-
72 files changed, 3856 insertions(+), 509 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e148c32..2bb7e27 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -1575,8 +1576,10 @@ public class TestRMContainerAllocator {
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
- List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ List<ContainerId> release, List<String> blacklistAdditions,
+ List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
@@ -1590,8 +1593,8 @@ public class TestRMContainerAllocator {
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
return super.allocate(
- applicationAttemptId, askCopy, release,
- blacklistAdditions, blacklistRemovals);
+ applicationAttemptId, askCopy, release, blacklistAdditions,
+ blacklistRemovals, increaseRequests, decreaseRequests);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 2d2c3e0..dae2ce7 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -174,6 +175,19 @@ public class NodeInfo {
public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
+
+ @Override
+ public void updateNodeHeartbeatResponseForContainersDecreasing(
+ NodeHeartbeatResponse response) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public List<Container> pullNewlyIncreasedContainers() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index ecc4734..8c65ccc 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -163,4 +164,16 @@ public class RMNodeWrapper implements RMNode {
public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET;
}
+
+ @Override
+ public void updateNodeHeartbeatResponseForContainersDecreasing(
+ NodeHeartbeatResponse response) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public List<Container> pullNewlyIncreasedContainers() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 14e2645..310b3b5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -202,15 +204,16 @@ final public class ResourceSchedulerWrapper
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
- List<ResourceRequest> resourceRequests,
- List<ContainerId> containerIds,
- List<String> strings, List<String> strings2) {
+ List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+ List<String> strings, List<String> strings2,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
try {
allocation = scheduler.allocate(attemptId, resourceRequests,
- containerIds, strings, strings2);
+ containerIds, strings, strings2, null, null);
return allocation;
} finally {
context.stop();
@@ -224,7 +227,7 @@ final public class ResourceSchedulerWrapper
}
} else {
return scheduler.allocate(attemptId,
- resourceRequests, containerIds, strings, strings2);
+ resourceRequests, containerIds, strings, strings2, null, null);
}
}
@@ -959,4 +962,12 @@ final public class ResourceSchedulerWrapper
return Priority.newInstance(0);
}
+ @Override
+ protected void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest,
+ SchedulerApplicationAttempt attempt) {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index a4416db..3626027 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -176,15 +177,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
@Override
public Allocation allocate(ApplicationAttemptId attemptId,
- List<ResourceRequest> resourceRequests,
- List<ContainerId> containerIds,
- List<String> strings, List<String> strings2) {
+ List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+ List<String> strings, List<String> strings2,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
try {
- allocation = super.allocate(attemptId, resourceRequests,
- containerIds, strings, strings2);
+ allocation = super
+ .allocate(attemptId, resourceRequests, containerIds, strings,
+ strings2, increaseRequests, decreaseRequests);
return allocation;
} finally {
context.stop();
@@ -197,8 +200,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
}
} else {
- return super.allocate(attemptId,
- resourceRequests, containerIds, strings, strings2);
+ return super.allocate(attemptId, resourceRequests, containerIds, strings,
+ strings2, increaseRequests, decreaseRequests);
}
}
@@ -426,7 +429,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
if (pool != null) pool.shutdown();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private void initMetrics() throws Exception {
metrics = new MetricRegistry();
// configuration
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d7ff457..5c0f849 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -214,6 +214,9 @@ Release 2.8.0 - UNRELEASED
YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe)
+ YARN-1651. CapacityScheduler side changes to support container resize.
+ (Wangda Tan via jianhe)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 108ad37..2394747 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -525,7 +526,9 @@ public class TestAMRMClientOnRMRestart {
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
- List<String> blacklistRemovals) {
+ List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy =
@@ -539,7 +542,8 @@ public class TestAMRMClientOnRMRestart {
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
return super.allocate(applicationAttemptId, askCopy, release,
- blacklistAdditions, blacklistRemovals);
+ blacklistAdditions, blacklistRemovals, increaseRequests,
+ decreaseRequests);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index c2fc1f0..2fdf214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -110,4 +110,9 @@ public class DefaultResourceCalculator extends ResourceCalculator {
);
}
+ @Override
+ public boolean fitsIn(Resource cluster,
+ Resource smaller, Resource bigger) {
+ return smaller.getMemory() <= bigger.getMemory();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 2ee95ce..b5c9967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -209,4 +209,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
);
}
+ @Override
+ public boolean fitsIn(Resource cluster,
+ Resource smaller, Resource bigger) {
+ return smaller.getMemory() <= bigger.getMemory()
+ && smaller.getVirtualCores() <= bigger.getVirtualCores();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 442196c..3a31225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -171,4 +171,9 @@ public abstract class ResourceCalculator {
*/
public abstract Resource divideAndCeil(Resource numerator, int denominator);
+ /**
+ * Check if a smaller resource can be contained by bigger resource.
+ */
+ public abstract boolean fitsIn(Resource cluster,
+ Resource smaller, Resource bigger);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 503d456..b05d021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -267,6 +267,11 @@ public class Resources {
return smaller.getMemory() <= bigger.getMemory() &&
smaller.getVirtualCores() <= bigger.getVirtualCores();
}
+
+ public static boolean fitsIn(ResourceCalculator rc, Resource cluster,
+ Resource smaller, Resource bigger) {
+ return rc.fitsIn(cluster, smaller, bigger);
+ }
public static Resource componentwiseMin(Resource lhs, Resource rhs) {
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
index 6a0b62e..0654891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
@@ -41,6 +41,35 @@ public class TestResourceCalculator {
public TestResourceCalculator(ResourceCalculator rs) {
this.resourceCalculator = rs;
}
+
+ @Test(timeout = 10000)
+ public void testFitsIn() {
+ Resource cluster = Resource.newInstance(1024, 1);
+
+ if (resourceCalculator instanceof DefaultResourceCalculator) {
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
+ Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
+ } else if (resourceCalculator instanceof DominantResourceCalculator) {
+ Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
+ Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
+ Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
+ Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+ Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
+ }
+ }
@Test(timeout = 10000)
public void testResourceCalculatorCompareMethod() {
@@ -92,7 +121,6 @@ public class TestResourceCalculator {
}
-
private void assertResourcesOperations(Resource clusterResource,
Resource lhs, Resource rhs, boolean lessThan, boolean lessThanOrEqual,
boolean greaterThan, boolean greaterThanOrEqual, Resource max,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 38fbc82..c0ccf57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -73,5 +74,5 @@ public interface NodeHeartbeatResponse {
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
List<Container> getContainersToDecrease();
- void addAllContainersToDecrease(List<Container> containersToDecrease);
+ void addAllContainersToDecrease(Collection<Container> containersToDecrease);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 12c5230..dc65141 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -20,14 +20,15 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
@@ -437,7 +438,7 @@ public class NodeHeartbeatResponsePBImpl extends
@Override
public void addAllContainersToDecrease(
- final List<Container> containersToDecrease) {
+ final Collection<Container> containersToDecrease) {
if (containersToDecrease == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 14142de..87c7bfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -451,11 +451,13 @@ public class ApplicationMasterService extends AbstractService implements
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
+
+ Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
- rScheduler.getMaximumResourceCapability(), app.getQueue(),
+ maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
@@ -469,6 +471,15 @@ public class ApplicationMasterService extends AbstractService implements
throw e;
}
+ try {
+ RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext,
+ request.getIncreaseRequests(), request.getDecreaseRequests(),
+ maximumCapacity);
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn(e);
+ throw e;
+ }
+
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
@@ -493,8 +504,9 @@ public class ApplicationMasterService extends AbstractService implements
allocation = EMPTY_ALLOCATION;
} else {
allocation =
- this.rScheduler.allocate(appAttemptId, ask, release,
- blacklistAdditions, blacklistRemovals);
+ this.rScheduler.allocate(appAttemptId, ask, release,
+ blacklistAdditions, blacklistRemovals,
+ request.getIncreaseRequests(), request.getDecreaseRequests());
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@@ -540,6 +552,10 @@ public class ApplicationMasterService extends AbstractService implements
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
+
+ // Handling increased/decreased containers
+ allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers());
+ allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index f049d97..cd9a61d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -56,6 +56,8 @@ public class RMAuditLogger {
public static final String RELEASE_CONTAINER = "AM Released Container";
public static final String UPDATE_APP_PRIORITY =
"Update Application Priority Request";
+ public static final String CHANGE_CONTAINER_RESOURCE =
+ "AM Changed Container Resource";
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 4d2e41c..cc30593 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -49,10 +52,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -107,6 +114,89 @@ public class RMServerUtils {
queueName, scheduler, rmContext, queueInfo);
}
}
+
+ /**
+ * Normalize container increase/decrease request, it will normalize and update
+ * ContainerResourceChangeRequest.targetResource
+ *
+ * <pre>
+ * - Throw exception when any other error happens
+ * </pre>
+ */
+ public static void checkAndNormalizeContainerChangeRequest(
+ RMContext rmContext, ContainerResourceChangeRequest request,
+ boolean increase) throws InvalidResourceRequestException {
+ ContainerId containerId = request.getContainerId();
+ ResourceScheduler scheduler = rmContext.getScheduler();
+ RMContainer rmContainer = scheduler.getRMContainer(containerId);
+ ResourceCalculator rc = scheduler.getResourceCalculator();
+
+ if (null == rmContainer) {
+ String msg =
+ "Failed to get rmContainer for "
+ + (increase ? "increase" : "decrease")
+ + " request, with container-id=" + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
+
+ if (rmContainer.getState() != RMContainerState.RUNNING) {
+ String msg =
+ "rmContainer's state is not RUNNING, for "
+ + (increase ? "increase" : "decrease")
+ + " request, with container-id=" + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
+
+ Resource targetResource = Resources.normalize(rc, request.getCapability(),
+ scheduler.getMinimumResourceCapability(),
+ scheduler.getMaximumResourceCapability(),
+ scheduler.getMinimumResourceCapability());
+
+ // Compare targetResource and original resource
+ Resource originalResource = rmContainer.getAllocatedResource();
+
+ // Resource comparasion should be >= (or <=) for all resource vectors, for
+ // example, you cannot request target resource of a <10G, 10> container to
+ // <20G, 8>
+ if (increase) {
+ if (originalResource.getMemory() > targetResource.getMemory()
+ || originalResource.getVirtualCores() > targetResource
+ .getVirtualCores()) {
+ String msg =
+ "Trying to increase a container, but target resource has some"
+ + " resource < original resource, target=" + targetResource
+ + " original=" + originalResource + " containerId="
+ + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
+ } else {
+ if (originalResource.getMemory() < targetResource.getMemory()
+ || originalResource.getVirtualCores() < targetResource
+ .getVirtualCores()) {
+ String msg =
+ "Trying to decrease a container, but target resource has "
+ + "some resource > original resource, target=" + targetResource
+ + " original=" + originalResource + " containerId="
+ + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
+ }
+
+ RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
+
+ // Target resource of the increase request is more than NM can offer
+ if (!Resources.fitsIn(scheduler.getResourceCalculator(),
+ scheduler.getClusterResource(), targetResource,
+ rmNode.getTotalCapability())) {
+ String msg = "Target resource=" + targetResource + " of containerId="
+ + containerId + " is more than node's total resource="
+ + rmNode.getTotalCapability();
+ throw new InvalidResourceRequestException(msg);
+ }
+
+ // Update normalized target resource
+ request.setCapability(targetResource);
+ }
/*
* @throw <code>InvalidResourceBlacklistRequestException </code> if the
@@ -123,6 +213,80 @@ public class RMServerUtils {
}
}
}
+
+ /**
+ * Check if we have:
+ * - Request for same containerId and different target resource
+ * - If targetResources violates maximum/minimumAllocation
+ */
+ public static void increaseDecreaseRequestSanityCheck(RMContext rmContext,
+ List<ContainerResourceChangeRequest> incRequests,
+ List<ContainerResourceChangeRequest> decRequests,
+ Resource maximumAllocation) throws InvalidResourceRequestException {
+ checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests);
+ validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation,
+ true);
+ validateIncreaseDecreaseRequest(rmContext, decRequests, maximumAllocation,
+ false);
+ }
+
+ private static void checkDuplicatedIncreaseDecreaseRequest(
+ List<ContainerResourceChangeRequest> incRequests,
+ List<ContainerResourceChangeRequest> decRequests)
+ throws InvalidResourceRequestException {
+ String msg = "There're multiple increase or decrease container requests "
+ + "for same containerId=";
+ Set<ContainerId> existedContainerIds = new HashSet<ContainerId>();
+ if (incRequests != null) {
+ for (ContainerResourceChangeRequest r : incRequests) {
+ if (!existedContainerIds.add(r.getContainerId())) {
+ throw new InvalidResourceRequestException(msg + r.getContainerId());
+ }
+ }
+ }
+
+ if (decRequests != null) {
+ for (ContainerResourceChangeRequest r : decRequests) {
+ if (!existedContainerIds.add(r.getContainerId())) {
+ throw new InvalidResourceRequestException(msg + r.getContainerId());
+ }
+ }
+ }
+ }
+
+ private static void validateIncreaseDecreaseRequest(RMContext rmContext,
+ List<ContainerResourceChangeRequest> requests, Resource maximumAllocation,
+ boolean increase)
+ throws InvalidResourceRequestException {
+ if (requests == null) {
+ return;
+ }
+ for (ContainerResourceChangeRequest request : requests) {
+ if (request.getCapability().getMemory() < 0
+ || request.getCapability().getMemory() > maximumAllocation
+ .getMemory()) {
+ throw new InvalidResourceRequestException("Invalid "
+ + (increase ? "increase" : "decrease") + " request"
+ + ", requested memory < 0"
+ + ", or requested memory > max configured" + ", requestedMemory="
+ + request.getCapability().getMemory() + ", maxMemory="
+ + maximumAllocation.getMemory());
+ }
+ if (request.getCapability().getVirtualCores() < 0
+ || request.getCapability().getVirtualCores() > maximumAllocation
+ .getVirtualCores()) {
+ throw new InvalidResourceRequestException("Invalid "
+ + (increase ? "increase" : "decrease") + " request"
+ + ", requested virtual cores < 0"
+ + ", or requested virtual cores > max configured"
+ + ", requestedVirtualCores="
+ + request.getCapability().getVirtualCores() + ", maxVirtualCores="
+ + maximumAllocation.getVirtualCores());
+ }
+
+ checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
+ }
+ }
/**
* It will validate to make sure all the containers belong to correct
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 100e991..557f6d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -449,6 +449,8 @@ public class ResourceTrackerService extends AbstractService implements
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
+ rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
+ nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
@@ -461,8 +463,9 @@ public class ResourceTrackerService extends AbstractService implements
// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
- remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+ remoteNodeStatus.getContainersStatuses(),
+ remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
+ remoteNodeStatus.getIncreasedContainers());
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 629b2a3..43de3ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -971,7 +971,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST,
amBlacklist.getAdditions(),
- amBlacklist.getRemovals());
+ amBlacklist.getRemovals(), null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
@@ -995,7 +995,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
- null);
+ null, null, null);
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 21d79ee..dc0d9ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -82,4 +82,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
String getNodeHttpAddress();
String getNodeLabelExpression();
+
+ boolean hasIncreaseReservation();
+
+ void cancelIncreaseReservation();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
new file mode 100644
index 0000000..920cfdb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class RMContainerChangeResourceEvent extends RMContainerEvent {
+
+ final Resource targetResource;
+ final boolean increase;
+
+ public RMContainerChangeResourceEvent(ContainerId containerId,
+ Resource targetResource, boolean increase) {
+ super(containerId, RMContainerEventType.CHANGE_RESOURCE);
+
+ this.targetResource = targetResource;
+ this.increase = increase;
+ }
+
+ public Resource getTargetResource() {
+ return targetResource;
+ }
+
+ public boolean isIncrease() {
+ return increase;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
index 259d68b3..a3b4b76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
@@ -25,6 +25,10 @@ public enum RMContainerEventType {
ACQUIRED,
KILL, // Also from Node on NodeRemoval
RESERVED,
+
+ // when a container acquired by AM after
+ // it increased/decreased
+ ACQUIRE_UPDATED_CONTAINER,
LAUNCHED,
FINISHED,
@@ -35,5 +39,12 @@ public enum RMContainerEventType {
// Source: ContainerAllocationExpirer
EXPIRE,
- RECOVER
+ RECOVER,
+
+ // Source: Scheduler
+ // Resource change approved by scheduler
+ CHANGE_RESOURCE,
+
+ // NM reported resource change is done
+ NM_DONE_CHANGE_RESOURCE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index a3d8bee..8133657 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -118,7 +118,18 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
- RMContainerEventType.EXPIRE)
+ RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED,
+ RMContainerEventType.EXPIRE,
+ new ContainerExpiredWhileRunningTransition())
+ .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
+ .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+ RMContainerEventType.ACQUIRE_UPDATED_CONTAINER,
+ new ContainerAcquiredWhileRunningTransition())
+ .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+ RMContainerEventType.NM_DONE_CHANGE_RESOURCE,
+ new NMReportedContainerChangeIsDoneTransition())
// Transitions from COMPLETED state
.addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED,
@@ -140,9 +151,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
RMContainerEventType.KILL, RMContainerEventType.FINISHED))
// create the topology tables
- .installTopology();
-
-
+ .installTopology();
private final StateMachine<RMContainerState, RMContainerEventType,
RMContainerEvent> stateMachine;
@@ -166,6 +175,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private ContainerStatus finishedStatus;
private boolean isAMContainer;
private List<ResourceRequest> resourceRequests;
+
+ private volatile boolean hasIncreaseReservation = false;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -264,7 +275,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
@Override
public Resource getAllocatedResource() {
- return container.getResource();
+ try {
+ readLock.lock();
+ return container.getResource();
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -471,8 +487,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
- private static final class ContainerReservedTransition extends
- BaseTransition {
+ private static final class ContainerReservedTransition
+ extends BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -480,6 +496,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
container.reservedResource = e.getReservedResource();
container.reservedNode = e.getReservedNode();
container.reservedPriority = e.getReservedPriority();
+
+ if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
+ .contains(container.getState())) {
+ // When container's state != NEW/RESERVED, it is an increase reservation
+ container.hasIncreaseReservation = true;
+ }
}
}
@@ -509,6 +531,70 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.getApplicationAttemptId().getApplicationId(), container.nodeId));
}
}
+
+ private static final class ContainerAcquiredWhileRunningTransition extends
+ BaseTransition {
+
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+ RMContainerUpdatesAcquiredEvent acquiredEvent =
+ (RMContainerUpdatesAcquiredEvent) event;
+ if (acquiredEvent.isIncreasedContainer()) {
+ // If container is increased but not acquired by AM, we will start
+ // containerAllocationExpirer for this container in this transition.
+ container.containerAllocationExpirer.register(event.getContainerId());
+ }
+ }
+ }
+
+ private static final class NMReportedContainerChangeIsDoneTransition
+ extends BaseTransition {
+
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // Unregister the allocation expirer, it is already increased..
+ container.containerAllocationExpirer.unregister(event.getContainerId());
+ }
+ }
+
+ private static final class ContainerExpiredWhileRunningTransition extends
+ BaseTransition {
+
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // When the container expired, and it has a pending increased request, we
+ // will kill the container.
+ // TODO, we can do better for this: roll back container resource to the
+ // resource before increase, and notify scheduler about this decrease as
+ // well. Will do that in a separated JIRA.
+ new KillTransition().transition(container, event);
+ }
+ }
+
+ private static final class ChangeResourceTransition extends BaseTransition {
+
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+ RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
+
+ // Register with containerAllocationExpirer.
+ // For now, we assume timeout for increase is as same as container
+ // allocation.
+ if (!changeEvent.isIncrease()) {
+ // if this is a decrease request, if container was increased but not
+ // told to NM, we can consider previous increase is cancelled,
+ // unregister from the containerAllocationExpirer
+ container.containerAllocationExpirer.unregister(container
+ .getContainerId());
+ }
+
+ container.container.setResource(changeEvent.getTargetResource());
+
+ // We reach here means we either allocated increase reservation OR
+ // decreased container, reservation will be cancelled anyway.
+ container.hasIncreaseReservation = false;
+ }
+ }
private static final class ContainerRescheduledTransition extends
FinishedTransition {
@@ -561,13 +647,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
RMAppAttempt rmAttempt = container.rmContext.getRMApps()
.get(container.getApplicationAttemptId().getApplicationId())
.getCurrentAppAttempt();
- if (ContainerExitStatus.PREEMPTED == container.finishedStatus
- .getExitStatus()) {
- rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
- container);
- }
if (rmAttempt != null) {
+ if (ContainerExitStatus.PREEMPTED == container.finishedStatus
+ .getExitStatus()) {
+ rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
+ container);
+ }
+
long usedMillis = container.finishTime - container.creationTime;
long memorySeconds = resource.getMemory()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
@@ -665,4 +752,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
return -1;
}
+
+ @Override
+ public boolean hasIncreaseReservation() {
+ return hasIncreaseReservation;
+ }
+
+ @Override
+ public void cancelIncreaseReservation() {
+ hasIncreaseReservation = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
new file mode 100644
index 0000000..0dccc5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMContainerUpdatesAcquiredEvent extends RMContainerEvent {
+ private final boolean increasedContainer;
+
+ public RMContainerUpdatesAcquiredEvent(ContainerId containerId,
+ boolean increasedContainer) {
+ super(containerId, RMContainerEventType.ACQUIRE_UPDATED_CONTAINER);
+ this.increasedContainer = increasedContainer;
+ }
+
+ public boolean isIncreasedContainer() {
+ return increasedContainer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 6bb0971..f28422a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -146,4 +147,12 @@ public interface RMNode {
* @return labels in this node
*/
public Set<String> getNodeLabels();
+
+ /**
+ * Update containers to be decreased
+ */
+ public void updateNodeHeartbeatResponseForContainersDecreasing(
+ NodeHeartbeatResponse response);
+
+ public List<Container> pullNewlyIncreasedContainers();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
new file mode 100644
index 0000000..62925ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeDecreaseContainerEvent extends RMNodeEvent {
+ final List<Container> toBeDecreasedContainers;
+
+ public RMNodeDecreaseContainerEvent(NodeId nodeId,
+ List<Container> toBeDecreasedContainers) {
+ super(nodeId, RMNodeEventType.DECREASE_CONTAINER);
+
+ this.toBeDecreasedContainers = toBeDecreasedContainers;
+ }
+
+ public List<Container> getToBeDecreasedContainers() {
+ return toBeDecreasedContainers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index 27ba1c0..a68c894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -42,6 +42,7 @@ public enum RMNodeEventType {
// Source: Container
CONTAINER_ALLOCATED,
CLEANUP_CONTAINER,
+ DECREASE_CONTAINER,
// Source: RMAppAttempt
FINISHED_CONTAINERS_PULLED_BY_AM,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7a1ba74..7a43598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -19,9 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -36,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -131,6 +136,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* the list of applications that are running on this node */
private final List<ApplicationId> runningApplications =
new ArrayList<ApplicationId>();
+
+ private final Map<ContainerId, Container> toBeDecreasedContainers =
+ new HashMap<>();
+
+ private final Map<ContainerId, Container> nmReportedIncreasedContainers =
+ new HashMap<>();
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
@@ -178,6 +189,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.DECREASE_CONTAINER,
+ new DecreaseContainersTransition())
//Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
@@ -430,6 +444,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.writeLock.unlock();
}
};
+
+ @VisibleForTesting
+ public Collection<Container> getToBeDecreasedContainers() {
+ return toBeDecreasedContainers.values();
+ }
+
+ @Override
+ public void updateNodeHeartbeatResponseForContainersDecreasing(
+ NodeHeartbeatResponse response) {
+ this.writeLock.lock();
+
+ try {
+ response.addAllContainersToDecrease(toBeDecreasedContainers.values());
+ toBeDecreasedContainers.clear();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
@Override
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@@ -759,6 +791,19 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}
+
+ public static class DecreaseContainersTransition
+ implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event;
+
+ for (Container c : de.getToBeDecreasedContainers()) {
+ rmNode.toBeDecreasedContainers.put(c.getId(), c);
+ }
+ }
+ }
public static class DeactivateNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -827,6 +872,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
rmNode.handleContainerStatus(statusEvent.getContainers());
+ rmNode.handleReportedIncreasedContainers(
+ statusEvent.getNMReportedIncreasedContainers());
List<LogAggregationReport> logAggregationReportsForApps =
statusEvent.getLogAggregationReportsForApps();
@@ -919,6 +966,34 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
return nlm.getLabelsOnNode(nodeId);
}
+
+ private void handleReportedIncreasedContainers(
+ List<Container> reportedIncreasedContainers) {
+ for (Container container : reportedIncreasedContainers) {
+ ContainerId containerId = container.getId();
+
+ // Don't bother with containers already scheduled for cleanup, or for
+ // applications already killed. The scheduler doens't need to know any
+ // more about this container
+ if (containersToClean.contains(containerId)) {
+ LOG.info("Container " + containerId + " already scheduled for "
+ + "cleanup, no further processing");
+ continue;
+ }
+
+ ApplicationId containerAppId =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ if (finishedApplications.contains(containerAppId)) {
+ LOG.info("Container " + containerId
+ + " belongs to an application that is already killed,"
+ + " no further processing");
+ continue;
+ }
+
+ this.nmReportedIncreasedContainers.put(containerId, container);
+ }
+ }
private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
// Filter the map to only obtain just launched containers and finished
@@ -989,4 +1064,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
+ @Override
+ public List<Container> pullNewlyIncreasedContainers() {
+ try {
+ writeLock.lock();
+
+ if (nmReportedIncreasedContainers.isEmpty()) {
+ return Collections.EMPTY_LIST;
+ } else {
+ List<Container> container =
+ new ArrayList<Container>(nmReportedIncreasedContainers.values());
+ nmReportedIncreasedContainers.clear();
+ return container;
+ }
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index b95d7d3..8323f3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+import java.util.Collections;
import java.util.List;
+
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -33,28 +36,36 @@ public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
private List<LogAggregationReport> logAggregationReportsForApps;
-
+ private final List<Container> nmReportedIncreasedContainers;
+
+ // Used by tests
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
NodeHeartbeatResponse latestResponse) {
- super(nodeId, RMNodeEventType.STATUS_UPDATE);
- this.nodeHealthStatus = nodeHealthStatus;
- this.containersCollection = collection;
- this.keepAliveAppIds = keepAliveAppIds;
- this.latestResponse = latestResponse;
- this.logAggregationReportsForApps = null;
+ this(nodeId, nodeHealthStatus, collection, keepAliveAppIds,
+ latestResponse, null);
}
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
NodeHeartbeatResponse latestResponse,
- List<LogAggregationReport> logAggregationReportsForApps) {
+ List<Container> nmReportedIncreasedContainers) {
+ this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse,
+ null, nmReportedIncreasedContainers);
+ }
+
+ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+ List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+ NodeHeartbeatResponse latestResponse,
+ List<LogAggregationReport> logAggregationReportsForApps,
+ List<Container> nmReportedIncreasedContainers) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
this.logAggregationReportsForApps = logAggregationReportsForApps;
+ this.nmReportedIncreasedContainers = nmReportedIncreasedContainers;
}
public NodeHealthStatus getNodeHealthStatus() {
@@ -81,4 +92,9 @@ public class RMNodeStatusEvent extends RMNodeEvent {
List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps;
}
+
+ public List<Container> getNMReportedIncreasedContainers() {
+ return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST
+ : nmReportedIncreasedContainers;
+ }
}
\ No newline at end of file