You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2013/12/09 18:48:01 UTC
svn commit: r1549629 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/proto/
hadoop-yarn/hadoop-yarn-common/src/main...
Author: sandy
Date: Mon Dec 9 17:48:00 2013
New Revision: 1549629
URL: http://svn.apache.org/r1549629
Log:
YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan via Sandy Ryza)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
- copied unchanged from r1549627, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
- copied unchanged from r1549627, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Dec 9 17:48:00 2013
@@ -28,6 +28,9 @@ Release 2.4.0 - UNRELEASED
YARN-1447. Common PB type definitions for container resizing. (Wangda Tan
via Sandy Ryza)
+ YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan
+ via Sandy Ryza)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java Mon Dec 9 17:48:00 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.Applic
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.util.Records;
@@ -60,12 +61,24 @@ public abstract class AllocateRequest {
List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) {
+ return newInstance(responseID, appProgress, resourceAsk,
+ containersToBeReleased, resourceBlacklistRequest, null);
+ }
+
+ @Public
+ @Stable
+ public static AllocateRequest newInstance(int responseID, float appProgress,
+ List<ResourceRequest> resourceAsk,
+ List<ContainerId> containersToBeReleased,
+ ResourceBlacklistRequest resourceBlacklistRequest,
+ List<ContainerResourceIncreaseRequest> increaseRequests) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased);
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
+ allocateRequest.setIncreaseRequests(increaseRequests);
return allocateRequest;
}
@@ -170,4 +183,22 @@ public abstract class AllocateRequest {
@Stable
public abstract void setResourceBlacklistRequest(
ResourceBlacklistRequest resourceBlacklistRequest);
+
+ /**
+ * Get the <code>ContainerResourceIncreaseRequest</code> being sent by the
+ * <code>ApplicationMaster</code>
+ */
+ @Public
+ @Stable
+ public abstract List<ContainerResourceIncreaseRequest> getIncreaseRequests();
+
+ /**
+ * Set the <code>ContainerResourceIncreaseRequest</code> to inform the
+ * <code>ResourceManager</code> about some container's resources need to be
+ * increased
+ */
+ @Public
+ @Stable
+ public abstract void setIncreaseRequests(
+ List<ContainerResourceIncreaseRequest> increaseRequests);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java Mon Dec 9 17:48:00 2013
@@ -28,6 +28,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -82,6 +84,23 @@ public abstract class AllocateResponse {
response.setNMTokens(nmTokens);
return response;
}
+
+ @Public
+ @Stable
+ public static AllocateResponse newInstance(int responseId,
+ List<ContainerStatus> completedContainers,
+ List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+ Resource availResources, AMCommand command, int numClusterNodes,
+ PreemptionMessage preempt, List<NMToken> nmTokens,
+ List<ContainerResourceIncrease> increasedContainers,
+ List<ContainerResourceDecrease> decreasedContainers) {
+ AllocateResponse response = newInstance(responseId, completedContainers,
+ allocatedContainers, updatedNodes, availResources, command,
+ numClusterNodes, preempt, nmTokens);
+ response.setIncreasedContainers(increasedContainers);
+ response.setDecreasedContainers(decreasedContainers);
+ return response;
+ }
/**
* If the <code>ResourceManager</code> needs the
@@ -221,4 +240,34 @@ public abstract class AllocateResponse {
@Private
@Unstable
public abstract void setNMTokens(List<NMToken> nmTokens);
+
+ /**
+ * Get the list of newly increased containers by <code>ResourceManager</code>
+ */
+ @Public
+ @Stable
+ public abstract List<ContainerResourceIncrease> getIncreasedContainers();
+
+ /**
+ * Set the list of newly increased containers by <code>ResourceManager</code>
+ */
+ @Private
+ @Unstable
+ public abstract void setIncreasedContainers(
+ List<ContainerResourceIncrease> increasedContainers);
+
+ /**
+ * Get the list of newly decreased containers by <code>NodeManager</code>
+ */
+ @Public
+ @Stable
+ public abstract List<ContainerResourceDecrease> getDecreasedContainers();
+
+ /**
+ * Set the list of newly decreased containers by <code>NodeManager</code>
+ */
+ @Private
+ @Unstable
+ public abstract void setDecreasedContainers(
+ List<ContainerResourceDecrease> decreasedContainers);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Mon Dec 9 17:48:00 2013
@@ -62,6 +62,7 @@ message AllocateRequestProto {
optional ResourceBlacklistRequestProto blacklist_request = 3;
optional int32 response_id = 4;
optional float progress = 5;
+ repeated ContainerResourceIncreaseRequestProto increase_request = 6;
}
message NMTokenProto {
@@ -79,6 +80,8 @@ message AllocateResponseProto {
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
repeated NMTokenProto nm_tokens = 9;
+ repeated ContainerResourceIncreaseProto increased_containers = 10;
+ repeated ContainerResourceDecreaseProto decreased_containers = 11;
}
//////////////////////////////////////////////////////
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java Mon Dec 9 17:48:00 2013
@@ -27,12 +27,15 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreaseRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
@@ -49,9 +52,9 @@ public class AllocateRequestPBImpl exten
private List<ResourceRequest> ask = null;
private List<ContainerId> release = null;
+ private List<ContainerResourceIncreaseRequest> increaseRequests = null;
private ResourceBlacklistRequest blacklistRequest = null;
-
public AllocateRequestPBImpl() {
builder = AllocateRequestProto.newBuilder();
}
@@ -62,7 +65,7 @@ public class AllocateRequestPBImpl exten
}
public AllocateRequestProto getProto() {
- mergeLocalToProto();
+ mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
@@ -95,6 +98,9 @@ public class AllocateRequestPBImpl exten
if (this.release != null) {
addReleasesToProto();
}
+ if (this.increaseRequests != null) {
+ addIncreaseRequestsToProto();
+ }
if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
}
@@ -156,6 +162,23 @@ public class AllocateRequestPBImpl exten
}
@Override
+ public List<ContainerResourceIncreaseRequest> getIncreaseRequests() {
+ initIncreaseRequests();
+ return this.increaseRequests;
+ }
+
+ @Override
+ public void setIncreaseRequests(
+ List<ContainerResourceIncreaseRequest> increaseRequests) {
+ if (increaseRequests == null) {
+ return;
+ }
+ initIncreaseRequests();
+ this.increaseRequests.clear();
+ this.increaseRequests.addAll(increaseRequests);
+ }
+
+ @Override
public ResourceBlacklistRequest getResourceBlacklistRequest() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.blacklistRequest != null) {
@@ -223,6 +246,57 @@ public class AllocateRequestPBImpl exten
};
builder.addAllAsk(iterable);
}
+
+ private void initIncreaseRequests() {
+ if (this.increaseRequests != null) {
+ return;
+ }
+ AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerResourceIncreaseRequestProto> list =
+ p.getIncreaseRequestList();
+ this.increaseRequests = new ArrayList<ContainerResourceIncreaseRequest>();
+
+ for (ContainerResourceIncreaseRequestProto c : list) {
+ this.increaseRequests.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addIncreaseRequestsToProto() {
+ maybeInitBuilder();
+ builder.clearIncreaseRequest();
+ if (increaseRequests == null) {
+ return;
+ }
+ Iterable<ContainerResourceIncreaseRequestProto> iterable =
+ new Iterable<ContainerResourceIncreaseRequestProto>() {
+ @Override
+ public Iterator<ContainerResourceIncreaseRequestProto> iterator() {
+ return new Iterator<ContainerResourceIncreaseRequestProto>() {
+
+ Iterator<ContainerResourceIncreaseRequest> iter =
+ increaseRequests.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerResourceIncreaseRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ }
+ };
+ builder.addAllIncreaseRequest(iterable);
+ }
+
@Override
public List<ContainerId> getReleaseList() {
initReleases();
@@ -292,6 +366,16 @@ public class AllocateRequestPBImpl exten
private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
return ((ResourceRequestPBImpl)t).getProto();
}
+
+ private ContainerResourceIncreaseRequestPBImpl convertFromProtoFormat(
+ ContainerResourceIncreaseRequestProto p) {
+ return new ContainerResourceIncreaseRequestPBImpl(p);
+ }
+
+ private ContainerResourceIncreaseRequestProto convertToProtoFormat(
+ ContainerResourceIncreaseRequest t) {
+ return ((ContainerResourceIncreaseRequestPBImpl) t).getProto();
+ }
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
@@ -308,6 +392,4 @@ public class AllocateRequestPBImpl exten
private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) {
return ((ResourceBlacklistRequestPBImpl)t).getProto();
}
-
-
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1549629&r1=1549628&r2=1549629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java Mon Dec 9 17:48:00 2013
@@ -28,12 +28,16 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
@@ -41,6 +45,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
@@ -63,6 +69,8 @@ public class AllocateResponsePBImpl exte
private List<Container> allocatedContainers = null;
private List<NMToken> nmTokens = null;
private List<ContainerStatus> completedContainersStatuses = null;
+ private List<ContainerResourceIncrease> increasedContainers = null;
+ private List<ContainerResourceDecrease> decreasedContainers = null;
private List<NodeReport> updatedNodes = null;
private PreemptionMessage preempt;
@@ -108,7 +116,7 @@ public class AllocateResponsePBImpl exte
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
- getProtoIterable(this.allocatedContainers);
+ getContainerProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (nmTokens != null) {
@@ -134,6 +142,18 @@ public class AllocateResponsePBImpl exte
if (this.preempt != null) {
builder.setPreempt(convertToProtoFormat(this.preempt));
}
+ if (this.increasedContainers != null) {
+ builder.clearIncreasedContainers();
+ Iterable<ContainerResourceIncreaseProto> iterable =
+ getIncreaseProtoIterable(this.increasedContainers);
+ builder.addAllIncreasedContainers(iterable);
+ }
+ if (this.decreasedContainers != null) {
+ builder.clearDecreasedContainers();
+ Iterable<ContainerResourceDecreaseProto> iterable =
+ getChangeProtoIterable(this.decreasedContainers);
+ builder.addAllDecreasedContainers(iterable);
+ }
}
private synchronized void mergeLocalToProto() {
@@ -306,6 +326,63 @@ public class AllocateResponsePBImpl exte
this.preempt = preempt;
}
+ @Override
+ public synchronized List<ContainerResourceIncrease> getIncreasedContainers() {
+ initLocalIncreasedContainerList();
+ return increasedContainers;
+ }
+
+ @Override
+ public synchronized void setIncreasedContainers(
+ List<ContainerResourceIncrease> increasedContainers) {
+ if (increasedContainers == null)
+ return;
+ initLocalIncreasedContainerList();
+ this.increasedContainers.addAll(increasedContainers);
+ }
+
+ @Override
+ public synchronized List<ContainerResourceDecrease> getDecreasedContainers() {
+ initLocalDecreasedContainerList();
+ return decreasedContainers;
+ }
+
+ @Override
+ public synchronized void setDecreasedContainers(
+ List<ContainerResourceDecrease> decreasedContainers) {
+ if (decreasedContainers == null) {
+ return;
+ }
+ initLocalDecreasedContainerList();
+ this.decreasedContainers.addAll(decreasedContainers);
+ }
+
+ private synchronized void initLocalIncreasedContainerList() {
+ if (this.increasedContainers != null) {
+ return;
+ }
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerResourceIncreaseProto> list = p.getIncreasedContainersList();
+ increasedContainers = new ArrayList<ContainerResourceIncrease>();
+
+ for (ContainerResourceIncreaseProto c : list) {
+ increasedContainers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private synchronized void initLocalDecreasedContainerList() {
+ if (this.decreasedContainers != null) {
+ return;
+ }
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerResourceDecreaseProto> list = p.getDecreasedContainersList();
+ decreasedContainers = new ArrayList<ContainerResourceDecrease>();
+
+ for (ContainerResourceDecreaseProto c : list) {
+ decreasedContainers.add(convertFromProtoFormat(c));
+ }
+ }
+
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
@@ -348,7 +425,71 @@ public class AllocateResponsePBImpl exte
}
}
- private synchronized Iterable<ContainerProto> getProtoIterable(
+ private synchronized Iterable<ContainerResourceIncreaseProto>
+ getIncreaseProtoIterable(
+ final List<ContainerResourceIncrease> newContainersList) {
+ maybeInitBuilder();
+ return new Iterable<ContainerResourceIncreaseProto>() {
+ @Override
+ public synchronized Iterator<ContainerResourceIncreaseProto> iterator() {
+ return new Iterator<ContainerResourceIncreaseProto>() {
+
+ Iterator<ContainerResourceIncrease> iter = newContainersList
+ .iterator();
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public synchronized ContainerResourceIncreaseProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public synchronized void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ }
+ };
+ }
+
+ private synchronized Iterable<ContainerResourceDecreaseProto>
+ getChangeProtoIterable(
+ final List<ContainerResourceDecrease> newContainersList) {
+ maybeInitBuilder();
+ return new Iterable<ContainerResourceDecreaseProto>() {
+ @Override
+ public synchronized Iterator<ContainerResourceDecreaseProto> iterator() {
+ return new Iterator<ContainerResourceDecreaseProto>() {
+
+ Iterator<ContainerResourceDecrease> iter = newContainersList
+ .iterator();
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public synchronized ContainerResourceDecreaseProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public synchronized void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ }
+ };
+ }
+
+ private synchronized Iterable<ContainerProto> getContainerProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@@ -467,7 +608,6 @@ public class AllocateResponsePBImpl exte
}
};
-
}
};
}
@@ -486,6 +626,26 @@ public class AllocateResponsePBImpl exte
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
+
+ private synchronized ContainerResourceIncrease convertFromProtoFormat(
+ ContainerResourceIncreaseProto p) {
+ return new ContainerResourceIncreasePBImpl(p);
+ }
+
+ private synchronized ContainerResourceIncreaseProto convertToProtoFormat(
+ ContainerResourceIncrease t) {
+ return ((ContainerResourceIncreasePBImpl) t).getProto();
+ }
+
+ private synchronized ContainerResourceDecrease convertFromProtoFormat(
+ ContainerResourceDecreaseProto p) {
+ return new ContainerResourceDecreasePBImpl(p);
+ }
+
+ private synchronized ContainerResourceDecreaseProto convertToProtoFormat(
+ ContainerResourceDecrease t) {
+ return ((ContainerResourceDecreasePBImpl) t).getProto();
+ }
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
@@ -500,8 +660,9 @@ public class AllocateResponsePBImpl exte
ContainerProto p) {
return new ContainerPBImpl(p);
}
-
- private synchronized ContainerProto convertToProtoFormat(Container t) {
+
+ private synchronized ContainerProto convertToProtoFormat(
+ Container t) {
return ((ContainerPBImpl)t).getProto();
}