You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/09/06 16:44:00 UTC
[27/50] [abbrv] hadoop git commit: YARN-5221. Expose
UpdateResourceRequest API to allow AM to request for change in container
properties. (asuresh)
YARN-5221. Expose UpdateResourceRequest API to allow AM to request for change in container properties. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6d9cff2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6d9cff2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6d9cff2
Branch: refs/heads/HADOOP-13345
Commit: d6d9cff21b7b6141ed88359652cf22e8973c0661
Parents: 9dcbdbd
Author: Arun Suresh <as...@apache.org>
Authored: Sat Aug 27 15:22:43 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Aug 30 15:52:29 2016 -0700
----------------------------------------------------------------------
.../app/local/TestLocalContainerAllocator.java | 4 +-
.../v2/app/rm/TestRMContainerAllocator.java | 10 +-
.../sls/scheduler/ResourceSchedulerWrapper.java | 6 +-
.../sls/scheduler/SLSCapacityScheduler.java | 6 +-
.../api/protocolrecords/AllocateRequest.java | 64 +---
.../api/protocolrecords/AllocateResponse.java | 76 +++--
.../hadoop/yarn/api/records/Container.java | 24 +-
.../records/ContainerResourceChangeRequest.java | 117 -------
.../yarn/api/records/ContainerUpdateType.java | 45 +++
.../yarn/api/records/UpdateContainerError.java | 119 +++++++
.../api/records/UpdateContainerRequest.java | 218 ++++++++++++
.../yarn/api/records/UpdatedContainer.java | 118 +++++++
.../src/main/proto/yarn_protos.proto | 6 +-
.../src/main/proto/yarn_service_protos.proto | 31 +-
.../distributedshell/ApplicationMaster.java | 4 +-
.../yarn/client/api/async/AMRMClientAsync.java | 9 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 8 +-
.../yarn/client/api/impl/AMRMClientImpl.java | 84 ++---
.../api/async/impl/TestAMRMClientAsync.java | 55 +--
.../yarn/client/api/impl/TestAMRMClient.java | 42 +--
.../api/impl/TestAMRMClientOnRMRestart.java | 14 +-
.../impl/pb/AllocateRequestPBImpl.java | 151 +++------
.../impl/pb/AllocateResponsePBImpl.java | 192 ++++++++---
.../api/records/impl/pb/ContainerPBImpl.java | 13 +
.../ContainerResourceChangeRequestPBImpl.java | 141 --------
.../yarn/api/records/impl/pb/ProtoUtils.java | 69 +++-
.../impl/pb/UpdateContainerErrorPBImpl.java | 125 +++++++
.../impl/pb/UpdateContainerRequestPBImpl.java | 187 ++++++++++
.../records/impl/pb/UpdatedContainerPBImpl.java | 117 +++++++
.../yarn/security/ContainerTokenIdentifier.java | 29 +-
.../src/main/proto/yarn_security_token.proto | 1 +
.../hadoop/yarn/api/TestPBImplRecords.java | 17 +-
.../yarn/security/TestYARNTokenIdentifier.java | 4 +-
.../api/protocolrecords/NMContainerStatus.java | 15 +-
.../impl/pb/NMContainerStatusPBImpl.java | 13 +
.../OpportunisticContainerAllocator.java | 2 +-
.../hadoop/yarn/server/utils/BuilderUtils.java | 14 +-
.../yarn_server_common_service_protos.proto | 1 +
.../protocolrecords/TestProtocolRecords.java | 4 +-
.../TestRegisterNodeManagerRequest.java | 2 +-
.../containermanager/ContainerManagerImpl.java | 16 +-
.../container/ContainerImpl.java | 7 +-
.../queuing/QueuingContainerManagerImpl.java | 3 +-
.../recovery/NMLeveldbStateStoreService.java | 41 ++-
.../recovery/NMNullStateStoreService.java | 4 +-
.../recovery/NMStateStoreService.java | 13 +-
.../nodemanager/TestNodeManagerResync.java | 2 +-
.../nodemanager/TestNodeStatusUpdater.java | 24 +-
.../amrmproxy/MockResourceManagerFacade.java | 4 +-
.../BaseContainerManagerTest.java | 2 +-
.../recovery/NMMemoryStateStoreService.java | 7 +-
.../TestNMLeveldbStateStoreService.java | 7 +-
.../nodemanager/webapp/MockContainer.java | 2 +-
.../nodemanager/webapp/TestNMWebServer.java | 6 +-
.../ApplicationMasterService.java | 54 ++-
.../server/resourcemanager/RMServerUtils.java | 338 ++++++++++---------
.../scheduler/AbstractYarnScheduler.java | 13 +-
.../scheduler/SchedContainerChangeRequest.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 12 +-
.../scheduler/YarnScheduler.java | 6 +-
.../scheduler/capacity/CapacityScheduler.java | 8 +-
.../scheduler/fair/FairScheduler.java | 6 +-
.../scheduler/fifo/FifoScheduler.java | 6 +-
.../security/RMContainerTokenSecretManager.java | 64 ++--
.../yarn/server/resourcemanager/MockAM.java | 7 +-
.../resourcemanager/TestApplicationCleanup.java | 9 +-
.../TestApplicationMasterService.java | 86 +++--
.../server/resourcemanager/TestRMRestart.java | 2 +-
.../TestResourceTrackerService.java | 8 +-
.../capacity/TestCapacityScheduler.java | 42 ++-
.../capacity/TestContainerAllocation.java | 13 +-
.../capacity/TestContainerResizing.java | 134 +++++---
.../capacity/TestIncreaseAllocationExpirer.java | 76 +++--
.../server/TestContainerManagerSecurity.java | 18 +-
.../TestMiniYarnClusterNodeUtilization.java | 2 -
.../src/test/proto/test_token.proto | 1 +
76 files changed, 2099 insertions(+), 1103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index f9e4595b..3fa0043 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -296,8 +297,7 @@ public class TestLocalContainerAllocator {
Resources.none(), null, 1, null,
Collections.<NMToken>emptyList(),
yarnToken,
- Collections.<Container>emptyList(),
- Collections.<Container>emptyList());
+ Collections.<UpdatedContainer>emptyList());
response.setApplicationPriority(Priority.newInstance(0));
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 44aa593..a115b13 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -99,7 +99,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -108,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -1703,8 +1703,8 @@ public class TestRMContainerAllocator {
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
- List<ContainerResourceChangeRequest> increaseRequests,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> increaseRequests,
+ List<UpdateContainerRequest> decreaseRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
@@ -1750,8 +1750,8 @@ public class TestRMContainerAllocator {
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
- List<ContainerResourceChangeRequest> increaseRequest,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> increaseRequest,
+ List<UpdateContainerRequest> decreaseRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 393300c..79f934c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -206,8 +206,8 @@ final public class ResourceSchedulerWrapper
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
List<String> strings, List<String> strings2,
- List<ContainerResourceChangeRequest> increaseRequests,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> increaseRequests,
+ List<UpdateContainerRequest> decreaseRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 1c3fa79..cf08309 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -179,8 +179,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
List<String> strings, List<String> strings2,
- List<ContainerResourceChangeRequest> increaseRequests,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> increaseRequests,
+ List<UpdateContainerRequest> decreaseRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
index e24ebdf..f7ce127 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -48,13 +48,8 @@ import org.apache.hadoop.yarn.util.Records;
* A list of unused {@link Container} which are being returned.
* </li>
* <li>
- * A list of {@link ContainerResourceChangeRequest} to inform
- * the <code>ResourceManager</code> about the resource increase
- * requirements of running containers.
- * </li>
- * <li>
- * A list of {@link ContainerResourceChangeRequest} to inform
- * the <code>ResourceManager</code> about the resource decrease
+ * A list of {@link UpdateContainerRequest} to inform
+ * the <code>ResourceManager</code> about the change in
* requirements of running containers.
* </li>
* </ul>
@@ -72,25 +67,23 @@ public abstract class AllocateRequest {
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) {
return newInstance(responseID, appProgress, resourceAsk,
- containersToBeReleased, resourceBlacklistRequest, null, null);
+ containersToBeReleased, resourceBlacklistRequest, null);
}
@Public
- @Stable
+ @Unstable
public static AllocateRequest newInstance(int responseID, float appProgress,
List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest,
- List<ContainerResourceChangeRequest> increaseRequests,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> updateRequests) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased);
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
- allocateRequest.setIncreaseRequests(increaseRequests);
- allocateRequest.setDecreaseRequests(decreaseRequests);
+ allocateRequest.setUpdateRequests(updateRequests);
return allocateRequest;
}
@@ -197,48 +190,25 @@ public abstract class AllocateRequest {
ResourceBlacklistRequest resourceBlacklistRequest);
/**
- * Get the list of container resource increase requests being sent by the
- * <code>ApplicationMaster</code>.
- * @return the list of {@link ContainerResourceChangeRequest}
- * being sent by the
- * <code>ApplicationMaster</code>.
- */
- @Public
- @Unstable
- public abstract List<ContainerResourceChangeRequest> getIncreaseRequests();
-
- /**
- * Set the list of container resource increase requests to inform the
- * <code>ResourceManager</code> about the containers whose resources need
- * to be increased.
- * @param increaseRequests list of
- * {@link ContainerResourceChangeRequest}
- */
- @Public
- @Unstable
- public abstract void setIncreaseRequests(
- List<ContainerResourceChangeRequest> increaseRequests);
-
- /**
- * Get the list of container resource decrease requests being sent by the
+ * Get the list of container update requests being sent by the
* <code>ApplicationMaster</code>.
- * @return list of {@link ContainerResourceChangeRequest}
+ * @return list of {@link UpdateContainerRequest}
* being sent by the
* <code>ApplicationMaster</code>.
*/
@Public
@Unstable
- public abstract List<ContainerResourceChangeRequest> getDecreaseRequests();
+ public abstract List<UpdateContainerRequest> getUpdateRequests();
/**
- * Set the list of container resource decrease requests to inform the
- * <code>ResourceManager</code> about the containers whose resources need
- * to be decreased.
- * @param decreaseRequests list of
- * {@link ContainerResourceChangeRequest}
+ * Set the list of container update requests to inform the
+ * <code>ResourceManager</code> about the containers that need to be
+ * updated.
+ * @param updateRequests list of <code>UpdateContainerRequest</code> for
+ * containers to be updated
*/
@Public
@Unstable
- public abstract void setDecreaseRequests(
- List<ContainerResourceChangeRequest> decreaseRequests);
+ public abstract void setUpdateRequests(
+ List<UpdateContainerRequest> updateRequests);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 4fba423..69089ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -95,19 +98,17 @@ public abstract class AllocateResponse {
}
@Public
- @Stable
+ @Unstable
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens,
- List<Container> increasedContainers,
- List<Container> decreasedContainers) {
+ List<UpdatedContainer> updatedContainers) {
AllocateResponse response = newInstance(responseId, completedContainers,
allocatedContainers, updatedNodes, availResources, command,
numClusterNodes, preempt, nmTokens);
- response.setIncreasedContainers(increasedContainers);
- response.setDecreasedContainers(decreasedContainers);
+ response.setUpdatedContainers(updatedContainers);
return response;
}
@@ -118,12 +119,11 @@ public abstract class AllocateResponse {
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
- List<Container> increasedContainers,
- List<Container> decreasedContainers) {
+ List<UpdatedContainer> updatedContainers) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
- nmTokens, increasedContainers, decreasedContainers);
+ nmTokens, updatedContainers);
response.setAMRMToken(amRMToken);
return response;
}
@@ -135,13 +135,11 @@ public abstract class AllocateResponse {
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
- List<Container> increasedContainers,
- List<Container> decreasedContainers,
- String collectorAddr) {
+ List<UpdatedContainer> updatedContainers, String collectorAddr) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
- nmTokens, increasedContainers, decreasedContainers);
+ nmTokens, updatedContainers);
response.setAMRMToken(amRMToken);
response.setCollectorAddr(collectorAddr);
return response;
@@ -290,40 +288,24 @@ public abstract class AllocateResponse {
public abstract void setNMTokens(List<NMToken> nmTokens);
/**
- * Get the list of newly increased containers by
+ * Get the list of newly updated containers by
* <code>ResourceManager</code>.
* @return list of newly increased containers
*/
@Public
@Unstable
- public abstract List<Container> getIncreasedContainers();
-
- /**
- * Set the list of newly increased containers by
- * <code>ResourceManager</code>.
- */
- @Private
- @Unstable
- public abstract void setIncreasedContainers(
- List<Container> increasedContainers);
-
- /**
- * Get the list of newly decreased containers by
- * <code>ResourceManager</code>.
- * @return the list of newly decreased containers
- */
- @Public
- @Unstable
- public abstract List<Container> getDecreasedContainers();
+ public abstract List<UpdatedContainer> getUpdatedContainers();
/**
- * Set the list of newly decreased containers by
+ * Set the list of newly updated containers by
* <code>ResourceManager</code>.
+ *
+ * @param updatedContainers List of Updated Containers.
*/
@Private
@Unstable
- public abstract void setDecreasedContainers(
- List<Container> decreasedContainers);
+ public abstract void setUpdatedContainers(
+ List<UpdatedContainer> updatedContainers);
/**
* The AMRMToken that belong to this attempt
@@ -364,4 +346,28 @@ public abstract class AllocateResponse {
@Unstable
public abstract void setCollectorAddr(String collectorAddr);
+ /**
+ * Get the list of container update errors to inform the
+ * Application Master about the container updates that could not be
+ * satisfied due to error.
+ *
+ * @return List of Update Container Errors.
+ */
+ @Public
+ @Unstable
+ public List<UpdateContainerError> getUpdateErrors() {
+ return new ArrayList<>();
+ }
+
+ /**
+ * Set the list of container update errors to inform the
+ * Application Master about the container updates that could not be
+ * satisfied due to error.
+ * @param updateErrors list of <code>UpdateContainerError</code> for
+ * containers updates requests that were in error
+ */
+ @Public
+ @Unstable
+ public void setUpdateErrors(List<UpdateContainerError> updateErrors) {
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
index 707a71d..4fdc803 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
@@ -230,8 +230,30 @@ public abstract class Container implements Comparable<Container> {
* allocation.
*/
@Private
- @Evolving
+ @Unstable
public void setAllocationRequestId(long allocationRequestID) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Get the version of this container. The version will be incremented when
+ * a container is updated.
+ *
+ * @return version of this container.
+ */
+ @Private
+ @Unstable
+ public int getVersion() {
+ return 0;
+ }
+
+ /**
+ * Set the version of this container.
+ * @param version of this container.
+ */
+ @Private
+ @Unstable
+ public void setVersion(int version) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java
deleted file mode 100644
index 117015b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- * {@code ContainerResourceChangeRequest} represents the request made by an
- * application to the {@code ResourceManager} to change resource allocation of
- * a running {@code Container}.
- * <p>
- * It includes:
- * <ul>
- * <li>{@link ContainerId} for the container.</li>
- * <li>
- * {@link Resource} capability of the container after the resource change
- * is completed.
- * </li>
- * </ul>
- *
- * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
- */
-@Public
-@Unstable
-public abstract class ContainerResourceChangeRequest {
-
- @Public
- @Unstable
- public static ContainerResourceChangeRequest newInstance(
- ContainerId existingContainerId, Resource targetCapability) {
- ContainerResourceChangeRequest context = Records
- .newRecord(ContainerResourceChangeRequest.class);
- context.setContainerId(existingContainerId);
- context.setCapability(targetCapability);
- return context;
- }
-
- /**
- * Get the <code>ContainerId</code> of the container.
- * @return <code>ContainerId</code> of the container
- */
- @Public
- @Unstable
- public abstract ContainerId getContainerId();
-
- /**
- * Set the <code>ContainerId</code> of the container.
- * @param containerId <code>ContainerId</code> of the container
- */
- @Public
- @Unstable
- public abstract void setContainerId(ContainerId containerId);
-
- /**
- * Get the <code>Resource</code> capability of the container.
- * @return <code>Resource</code> capability of the container
- */
- @Public
- @Unstable
- public abstract Resource getCapability();
-
- /**
- * Set the <code>Resource</code> capability of the container.
- * @param capability <code>Resource</code> capability of the container
- */
- @Public
- @Unstable
- public abstract void setCapability(Resource capability);
-
- @Override
- public int hashCode() {
- return getCapability().hashCode() + getContainerId().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof ContainerResourceChangeRequest) {
- ContainerResourceChangeRequest ctx =
- (ContainerResourceChangeRequest) other;
-
- if (getContainerId() == null && ctx.getContainerId() != null) {
- return false;
- } else if (!getContainerId().equals(ctx.getContainerId())) {
- return false;
- }
-
- if (getCapability() == null && ctx.getCapability() != null) {
- return false;
- } else if (!getCapability().equals(ctx.getCapability())) {
- return false;
- }
-
- return true;
- } else {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
new file mode 100644
index 0000000..978ea09
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encodes the type of Container Update.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum ContainerUpdateType {
+
+ /**
+ * Resource increase.
+ */
+ INCREASE_RESOURCE,
+
+ /**
+ * Resource decrease.
+ */
+ DECREASE_RESOURCE,
+
+ /**
+ * Execution Type change.
+ */
+ UPDATE_EXECUTION_TYPE
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
new file mode 100644
index 0000000..7102f7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code UpdateContainerError} is used by the Scheduler to notify the
+ * ApplicationMaster of an UpdateContainerRequest it cannot satisfy due to
+ * an error in the request. It includes the update request as well as
+ * a reason for why the request was not satisfiable.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class UpdateContainerError {
+
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public static UpdateContainerError newInstance(String reason,
+ UpdateContainerRequest updateContainerRequest) {
+ UpdateContainerError error = Records.newRecord(UpdateContainerError.class);
+ error.setReason(reason);
+ error.setUpdateContainerRequest(updateContainerRequest);
+ return error;
+ }
+
+ /**
+ * Get reason why the update request was not satisfiable.
+ * @return Reason
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract String getReason();
+
+ /**
+ * Set reason why the update request was not satisfiable.
+ * @param reason Reason
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setReason(String reason);
+
+ /**
+ * Get the {@code UpdateContainerRequest} that was not satisfiable.
+ * @return UpdateContainerRequest
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract UpdateContainerRequest getUpdateContainerRequest();
+
+ /**
+ * Set the {@code UpdateContainerRequest} that was not satisfiable.
+ * @param updateContainerRequest Update Container Request
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setUpdateContainerRequest(
+ UpdateContainerRequest updateContainerRequest);
+
+ @Override
+ public int hashCode() {
+ final int prime = 2153;
+ int result = 2459;
+ String reason = getReason();
+ UpdateContainerRequest updateReq = getUpdateContainerRequest();
+ result = prime * result + ((reason == null) ? 0 : reason.hashCode());
+ result = prime * result + ((updateReq == null) ? 0 : updateReq.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ UpdateContainerError other = (UpdateContainerError) obj;
+ String reason = getReason();
+ if (reason == null) {
+ if (other.getReason() != null) {
+ return false;
+ }
+ } else if (!reason.equals(other.getReason())) {
+ return false;
+ }
+ UpdateContainerRequest req = getUpdateContainerRequest();
+ if (req == null) {
+ if (other.getUpdateContainerRequest() != null) {
+ return false;
+ }
+ } else if (!req.equals(other.getUpdateContainerRequest())) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
new file mode 100644
index 0000000..ef39f5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code UpdateContainerRequest} represents the request made by an
+ * application to the {@code ResourceManager} to update an attribute of a
+ * {@code Container} such as its Resource allocation or (@code ExecutionType}
+ * <p>
+ * It includes:
+ * <ul>
+ * <li>version for the container.</li>
+ * <li>{@link ContainerId} for the container.</li>
+ * <li>
+ * {@link Resource} capability of the container after the update request
+ * is completed.
+ * </li>
+ * <li>
+ * {@link ExecutionType} of the container after the update request is
+ * completed.
+ * </li>
+ * </ul>
+ *
+ * Update rules:
+ * <ul>
+ * <li>
+ * Currently only ONE aspect of the container can be updated per request
+ * (user can either update Capability OR ExecutionType in one request..
+ * not both).
+ * </li>
+ * <li>
+ * There must be only 1 update request per container in an allocate call.
+ * </li>
+ * <li>
+ * If a new update request is sent for a container (in a subsequent allocate
+ * call) before the first one is satisfied by the Scheduler, it will
+ * overwrite the previous request.
+ * </li>
+ * </ul>
+ * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class UpdateContainerRequest {
+
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public static UpdateContainerRequest newInstance(int version,
+ ContainerId containerId, ContainerUpdateType updateType,
+ Resource targetCapability, ExecutionType targetExecutionType) {
+ UpdateContainerRequest request =
+ Records.newRecord(UpdateContainerRequest.class);
+ request.setContainerVersion(version);
+ request.setContainerId(containerId);
+ request.setContainerUpdateType(updateType);
+ request.setExecutionType(targetExecutionType);
+ request.setCapability(targetCapability);
+ return request;
+ }
+
+ /**
+ * Get the <code>ContainerId</code> of the container.
+ * @return <code>ContainerId</code> of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract int getContainerVersion();
+
+ /**
+ * Set the current version of the container.
+ * @param containerVersion of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setContainerVersion(int containerVersion);
+
+ /**
+ * Get the <code>ContainerUpdateType</code> of the container.
+ * @return <code>ContainerUpdateType</code> of the container.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract ContainerUpdateType getContainerUpdateType();
+
+ /**
+ * Set the <code>ContainerUpdateType</code> of the container.
+ * @param updateType of the Container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setContainerUpdateType(ContainerUpdateType updateType);
+
+ /**
+ * Get the <code>ContainerId</code> of the container.
+ * @return <code>ContainerId</code> of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract ContainerId getContainerId();
+
+ /**
+ * Set the <code>ContainerId</code> of the container.
+ * @param containerId <code>ContainerId</code> of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setContainerId(ContainerId containerId);
+
+ /**
+ * Get the <code>Resource</code> capability of the container.
+ * @return <code>Resource</code> capability of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract Resource getCapability();
+
+ /**
+ * Set the <code>Resource</code> capability of the container.
+ * @param capability <code>Resource</code> capability of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setCapability(Resource capability);
+
+ /**
+ * Get the target <code>ExecutionType</code> of the container.
+ * @return <code>ExecutionType</code> of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set the target <code>ExecutionType</code> of the container.
+ * @param executionType <code>ExecutionType</code> of the container
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setExecutionType(ExecutionType executionType);
+
+ @Override
+ public int hashCode() {
+ final int prime = 2153;
+ int result = 2459;
+ ContainerId cId = getContainerId();
+ ExecutionType execType = getExecutionType();
+ Resource capability = getCapability();
+ result =
+ prime * result + ((capability == null) ? 0 : capability.hashCode());
+ result = prime * result + ((cId == null) ? 0 : cId.hashCode());
+ result = prime * result + getContainerVersion();
+ result = prime * result + ((execType == null) ? 0 : execType.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ UpdateContainerRequest other = (UpdateContainerRequest) obj;
+ Resource capability = getCapability();
+ if (capability == null) {
+ if (other.getCapability() != null) {
+ return false;
+ }
+ } else if (!capability.equals(other.getCapability())) {
+ return false;
+ }
+ ContainerId cId = getContainerId();
+ if (cId == null) {
+ if (other.getContainerId() != null) {
+ return false;
+ }
+ } else if (!cId.equals(other.getContainerId())) {
+ return false;
+ }
+ if (getContainerVersion() != other.getContainerVersion()) {
+ return false;
+ }
+ ExecutionType execType = getExecutionType();
+ if (execType == null) {
+ if (other.getExecutionType() != null) {
+ return false;
+ }
+ } else if (!execType.equals(other.getExecutionType())) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java
new file mode 100644
index 0000000..68f6ca1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An object that encapsulates an updated container and the
+ * type of Update.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class UpdatedContainer {
+
+ /**
+ * Static Factory method.
+ *
+ * @param updateType ContainerUpdateType
+ * @param container Container
+ * @return UpdatedContainer
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public static UpdatedContainer newInstance(ContainerUpdateType updateType,
+ Container container) {
+ UpdatedContainer updatedContainer =
+ Records.newRecord(UpdatedContainer.class);
+ updatedContainer.setUpdateType(updateType);
+ updatedContainer.setContainer(container);
+ return updatedContainer;
+ }
+
+ /**
+ * Get the <code>ContainerUpdateType</code>.
+ * @return ContainerUpdateType
+ */
+ public abstract ContainerUpdateType getUpdateType();
+
+ /**
+ * Set the <code>ContainerUpdateType</code>.
+ * @param updateType ContainerUpdateType
+ */
+ public abstract void setUpdateType(ContainerUpdateType updateType);
+
+ /**
+ * Get the <code>Container</code>.
+ * @return Container
+ */
+ public abstract Container getContainer();
+
+ /**
+ * Set the <code>Container</code>.
+ * @param container Container
+ */
+ public abstract void setContainer(Container container);
+
+ @Override
+ public int hashCode() {
+ final int prime = 2153;
+ int result = 2459;
+ ContainerUpdateType updateType = getUpdateType();
+ Container container = getContainer();
+ result = prime * result + ((updateType == null) ? 0 :
+ updateType.hashCode());
+ result = prime * result + ((container == null) ? 0 : container.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ UpdatedContainer other = (UpdatedContainer) obj;
+ ContainerUpdateType updateType = getUpdateType();
+ if (updateType == null) {
+ if (other.getUpdateType() != null) {
+ return false;
+ }
+ } else if (updateType != other.getUpdateType()) {
+ return false;
+ }
+ Container container = getContainer();
+ if (container == null) {
+ if (other.getContainer() != null) {
+ return false;
+ }
+ } else if (!container.equals(other.getContainer())) {
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 2cc1784..2d6007e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -94,6 +94,7 @@ message ContainerProto {
optional hadoop.common.TokenProto container_token = 6;
optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
optional int64 allocation_request_id = 8 [default = -1];
+ optional int32 version = 9 [default = 0];
}
message ContainerReportProto {
@@ -535,11 +536,6 @@ enum ContainerExitStatusProto {
DISKS_FAILED = -101;
}
-message ContainerResourceChangeRequestProto {
- optional ContainerIdProto container_id = 1;
- optional ResourceProto capability = 2;
-}
-
message ContainerRetryContextProto {
optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY];
repeated int32 error_codes = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 4abb80b..97eaa5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -60,14 +60,32 @@ message FinishApplicationMasterResponseProto {
optional bool isUnregistered = 1 [default = false];
}
+enum ContainerUpdateTypeProto {
+ INCREASE_RESOURCE = 0;
+ DECREASE_RESOURCE = 1;
+ UPDATE_EXECUTION_TYPE = 2;
+}
+
+message UpdateContainerRequestProto {
+ required int32 container_version = 1;
+ required ContainerIdProto container_id = 2;
+ required ContainerUpdateTypeProto update_type = 3;
+ optional ResourceProto capability = 4;
+ optional ExecutionTypeProto execution_type = 5;
+}
+
+message UpdateContainerErrorProto {
+ optional string reason = 1;
+ optional UpdateContainerRequestProto update_request = 2;
+}
+
message AllocateRequestProto {
repeated ResourceRequestProto ask = 1;
repeated ContainerIdProto release = 2;
optional ResourceBlacklistRequestProto blacklist_request = 3;
optional int32 response_id = 4;
optional float progress = 5;
- repeated ContainerResourceChangeRequestProto increase_request = 6;
- repeated ContainerResourceChangeRequestProto decrease_request = 7;
+ repeated UpdateContainerRequestProto update_requests = 6;
}
message NMTokenProto {
@@ -75,6 +93,11 @@ message NMTokenProto {
optional hadoop.common.TokenProto token = 2;
}
+message UpdatedContainerProto {
+ required ContainerUpdateTypeProto update_type = 1;
+ required ContainerProto container = 2;
+}
+
message AllocateResponseProto {
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
@@ -85,11 +108,11 @@ message AllocateResponseProto {
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
repeated NMTokenProto nm_tokens = 9;
- repeated ContainerProto increased_containers = 10;
- repeated ContainerProto decreased_containers = 11;
+ repeated UpdatedContainerProto updated_containers = 10;
optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
optional string collector_addr = 14;
+ repeated UpdateContainerErrorProto update_errors = 15;
}
enum SchedulerResourceTypes {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b9949e1..17dae6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -912,7 +913,8 @@ public class ApplicationMaster {
}
@Override
- public void onContainersResourceChanged(List<Container> containers) {}
+ public void onContainersUpdated(
+ List<UpdatedContainer> containers) {}
@Override
public void onShutdownRequest() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 10d2a2f..d2195a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -63,7 +65,7 @@ import com.google.common.annotations.VisibleForTesting;
* [run tasks on the containers]
* }
*
- * public void onContainersResourceChanged(List<Container> containers) {
+ * public void onContainersUpdated(List<Container> containers) {
* [determine if resource allocation of containers have been increased in
* the ResourceManager, and if so, inform the NodeManagers to increase the
* resource monitor/enforcement on the containers]
@@ -426,8 +428,9 @@ extends AbstractService {
* Called when the ResourceManager responds to a heartbeat with containers
* whose resource allocation has been changed.
*/
- public abstract void onContainersResourceChanged(
- List<Container> containers);
+ @Public
+ @Unstable
+ public abstract void onContainersUpdated(List<UpdatedContainer> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 242df65..bc6cadd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -354,12 +355,11 @@ extends AMRMClientAsync<T> {
if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
// RM side of the implementation guarantees that there are
// no duplications between increased and decreased containers
- List<Container> changed = new ArrayList<>();
- changed.addAll(response.getIncreasedContainers());
- changed.addAll(response.getDecreasedContainers());
+ List<UpdatedContainer> changed = new ArrayList<>();
+ changed.addAll(response.getUpdatedContainers());
if (!changed.isEmpty()) {
((AMRMClientAsync.AbstractCallbackHandler) handler)
- .onContainersResourceChanged(changed);
+ .onContainersUpdated(changed);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 60834f6..6f6bb85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -261,36 +263,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
new HashMap<>();
try {
synchronized (this) {
- askList = new ArrayList<ResourceRequest>(ask.size());
- for(ResourceRequest r : ask) {
- // create a copy of ResourceRequest as we might change it while the
- // RPC layer is using it to send info across
- ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
- r.getResourceName(), r.getCapability(), r.getNumContainers(),
- r.getRelaxLocality(), r.getNodeLabelExpression(),
- r.getExecutionTypeRequest());
- rr.setAllocationRequestId(r.getAllocationRequestId());
- askList.add(rr);
- }
- List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
- List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
+ askList = cloneAsks();
// Save the current change for recovery
oldChange.putAll(change);
- for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
- change.entrySet()) {
- Container container = entry.getValue().getKey();
- Resource original = container.getResource();
- Resource target = entry.getValue().getValue();
- if (Resources.fitsIn(target, original)) {
- // This is a decrease request
- decreaseList.add(ContainerResourceChangeRequest.newInstance(
- container.getId(), target));
- } else {
- // This is an increase request
- increaseList.add(ContainerResourceChangeRequest.newInstance(
- container.getId(), target));
- }
- }
+ List<UpdateContainerRequest> updateList = createUpdateList();
releaseList = new ArrayList<ContainerId>(release);
// optimistically clear this collection assuming no RPC failure
ask.clear();
@@ -306,8 +282,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
- askList, releaseList, blacklistRequest,
- increaseList, decreaseList);
+ askList, releaseList, blacklistRequest, updateList);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
@@ -358,9 +333,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
if (!pendingChange.isEmpty()) {
List<ContainerStatus> completed =
allocateResponse.getCompletedContainersStatuses();
- List<Container> changed = new ArrayList<>();
- changed.addAll(allocateResponse.getIncreasedContainers());
- changed.addAll(allocateResponse.getDecreasedContainers());
+ List<UpdatedContainer> changed = new ArrayList<>();
+ changed.addAll(allocateResponse.getUpdatedContainers());
// remove all pending change requests that belong to the completed
// containers
for (ContainerStatus status : completed) {
@@ -417,6 +391,40 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return allocateResponse;
}
+ private List<UpdateContainerRequest> createUpdateList() {
+ List<UpdateContainerRequest> updateList = new ArrayList<>();
+ for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
+ change.entrySet()) {
+ Resource targetCapability = entry.getValue().getValue();
+ Resource currCapability = entry.getValue().getKey().getResource();
+ int version = entry.getValue().getKey().getVersion();
+ ContainerUpdateType updateType =
+ ContainerUpdateType.INCREASE_RESOURCE;
+ if (Resources.fitsIn(targetCapability, currCapability)) {
+ updateType = ContainerUpdateType.DECREASE_RESOURCE;
+ }
+ updateList.add(
+ UpdateContainerRequest.newInstance(version, entry.getKey(),
+ updateType, targetCapability, null));
+ }
+ return updateList;
+ }
+
+ private List<ResourceRequest> cloneAsks() {
+ List<ResourceRequest> askList = new ArrayList<ResourceRequest>(ask.size());
+ for(ResourceRequest r : ask) {
+ // create a copy of ResourceRequest as we might change it while the
+ // RPC layer is using it to send info across
+ ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
+ r.getResourceName(), r.getCapability(), r.getNumContainers(),
+ r.getRelaxLocality(), r.getNodeLabelExpression(),
+ r.getExecutionTypeRequest());
+ rr.setAllocationRequestId(r.getAllocationRequestId());
+ askList.add(rr);
+ }
+ return askList;
+ }
+
protected void removePendingReleaseRequests(
List<ContainerStatus> completedContainersStatuses) {
for (ContainerStatus containerStatus : completedContainersStatuses) {
@@ -425,16 +433,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
protected void removePendingChangeRequests(
- List<Container> changedContainers) {
- for (Container changedContainer : changedContainers) {
- ContainerId containerId = changedContainer.getId();
+ List<UpdatedContainer> changedContainers) {
+ for (UpdatedContainer changedContainer : changedContainers) {
+ ContainerId containerId = changedContainer.getContainer().getId();
if (pendingChange.get(containerId) == null) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RM has confirmed changed resource allocation for "
+ "container " + containerId + ". Current resource allocation:"
- + changedContainer.getResource()
+ + changedContainer.getContainer().getResource()
+ ". Remove pending change request:"
+ pendingChange.get(containerId).getValue());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index c7b3a94..dac82e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -45,9 +45,11 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -89,20 +91,21 @@ public class TestAMRMClientAsync {
TestCallbackHandler callbackHandler = new TestCallbackHandler();
final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
- when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
- @Override
- public AllocateResponse answer(InvocationOnMock invocation)
- throws Throwable {
- secondHeartbeatSync.incrementAndGet();
- while (heartbeatBlock.get()) {
- synchronized (heartbeatBlock) {
- heartbeatBlock.wait();
+ when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(
+ new Answer<AllocateResponse>() {
+ @Override
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Throwable {
+ secondHeartbeatSync.incrementAndGet();
+ while (heartbeatBlock.get()) {
+ synchronized (heartbeatBlock) {
+ heartbeatBlock.wait();
+ }
+ }
+ secondHeartbeatSync.incrementAndGet();
+ return response2;
}
- }
- secondHeartbeatSync.incrementAndGet();
- return response2;
- }
- }).thenReturn(response3).thenReturn(emptyResponse);
+ }).thenReturn(response3).thenReturn(emptyResponse);
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(null);
when(client.getAvailableResources()).thenAnswer(new Answer<Resource>() {
@@ -410,10 +413,21 @@ public class TestAMRMClientAsync {
List<ContainerStatus> completed, List<Container> allocated,
List<Container> increased, List<Container> decreased,
List<NMToken> nmTokens) {
+ List<UpdatedContainer> updatedContainers = new ArrayList<>();
+ for (Container c : increased) {
+ updatedContainers.add(
+ UpdatedContainer.newInstance(
+ ContainerUpdateType.INCREASE_RESOURCE, c));
+ }
+ for (Container c : decreased) {
+ updatedContainers.add(
+ UpdatedContainer.newInstance(
+ ContainerUpdateType.DECREASE_RESOURCE, c));
+ }
AllocateResponse response =
AllocateResponse.newInstance(0, completed, allocated,
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
- increased, decreased);
+ updatedContainers);
return response;
}
@@ -429,7 +443,7 @@ public class TestAMRMClientAsync {
extends AMRMClientAsync.AbstractCallbackHandler {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
- private final List<Container> changedContainers = new ArrayList<>();
+ private final List<UpdatedContainer> changedContainers = new ArrayList<>();
Exception savedException = null;
volatile boolean reboot = false;
Object notifier = new Object();
@@ -448,8 +462,8 @@ public class TestAMRMClientAsync {
return ret;
}
- public List<Container> takeChangedContainers() {
- List<Container> ret = null;
+ public List<UpdatedContainer> takeChangedContainers() {
+ List<UpdatedContainer> ret = null;
synchronized (changedContainers) {
if (!changedContainers.isEmpty()) {
ret = new ArrayList<>(changedContainers);
@@ -488,8 +502,8 @@ public class TestAMRMClientAsync {
}
@Override
- public void onContainersResourceChanged(
- List<Container> changed) {
+ public void onContainersUpdated(
+ List<UpdatedContainer> changed) {
synchronized (changedContainers) {
changedContainers.clear();
changedContainers.addAll(changed);
@@ -564,7 +578,8 @@ public class TestAMRMClientAsync {
public void onContainersAllocated(List<Container> containers) {}
@Override
- public void onContainersResourceChanged(List<Container> containers) {}
+ public void onContainersUpdated(
+ List<UpdatedContainer> containers) {}
@Override
public void onShutdownRequest() {}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index e0ad2c4..38178a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -1061,33 +1062,20 @@ public class TestAMRMClient {
Assert.assertEquals(2, amClientImpl.pendingChange.size());
// as of now: container1 asks to decrease to (512, 1)
// container2 asks to increase to (2048, 1)
- List<Container> decreasedContainers;
- List<Container> increasedContainers;
- int allocateAttempts = 0;
- int decreased = 0;
- int increased = 0;
- while (allocateAttempts < 30) {
- // send allocation requests
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- decreasedContainers = allocResponse.getDecreasedContainers();
- increasedContainers = allocResponse.getIncreasedContainers();
- decreased += decreasedContainers.size();
- increased += increasedContainers.size();
- if (allocateAttempts == 0) {
- // we should get decrease confirmation right away
- Assert.assertEquals(1, decreased);
- // After first allocate request check change size
- Assert.assertEquals(0, amClientImpl.change.size());
- } else if (increased == 1) {
- break;
- }
- // increase request is served after next NM heart beat is received
- // Sleeping and retrying allocate
- sleep(20);
- allocateAttempts++;
- }
- Assert.assertEquals(1, decreased);
- Assert.assertEquals(1, increased);
+ // send allocation requests
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ Assert.assertEquals(0, amClientImpl.change.size());
+ // we should get decrease confirmation right away
+ List<UpdatedContainer> updatedContainers =
+ allocResponse.getUpdatedContainers();
+ Assert.assertEquals(1, updatedContainers.size());
+ // we should get increase allocation after the next NM's heartbeat to RM
+ sleep(150);
+ // get allocations
+ allocResponse = amClient.allocate(0.1f);
+ updatedContainers =
+ allocResponse.getUpdatedContainers();
+ Assert.assertEquals(1, updatedContainers.size());
}
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 719d9a1..f1c49f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -39,12 +39,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -262,7 +262,7 @@ public class TestAMRMClientOnRMRestart {
// new NM to represent NM re-register
nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
NMContainerStatus containerReport =
- NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
+ NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0);
nm1.registerNode(Collections.singletonList(containerReport),
@@ -399,7 +399,7 @@ public class TestAMRMClientOnRMRestart {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
NMContainerStatus containerReport =
- NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
+ NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0);
nm1.registerNode(Arrays.asList(containerReport), null);
@@ -562,8 +562,8 @@ public class TestAMRMClientOnRMRestart {
List<ResourceRequest> lastAsk = null;
List<ContainerId> lastRelease = null;
- List<ContainerResourceChangeRequest> lastIncrease = null;
- List<ContainerResourceChangeRequest> lastDecrease = null;
+ List<UpdateContainerRequest> lastIncrease = null;
+ List<UpdateContainerRequest> lastDecrease = null;
List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals;
@@ -574,8 +574,8 @@ public class TestAMRMClientOnRMRestart {
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
- List<ContainerResourceChangeRequest> increaseRequests,
- List<ContainerResourceChangeRequest> decreaseRequests) {
+ List<UpdateContainerRequest> increaseRequests,
+ List<UpdateContainerRequest> decreaseRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org