You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2016/08/27 00:17:37 UTC
[6/8] hadoop git commit: YARN-4889. Changes in AMRMClient for
identifying resource-requests explicitly. (Arun Suresh via wangda)
YARN-4889. Changes in AMRMClient for identifying resource-requests explicitly. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19c743c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19c743c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19c743c1
Branch: refs/heads/HADOOP-13341
Commit: 19c743c1bbcaf3df8f1d63e557143c960a538c42
Parents: b930dc3
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Aug 26 16:48:00 2016 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Aug 26 17:14:12 2016 -0700
----------------------------------------------------------------------
.../yarn/api/records/ResourceRequest.java | 19 +-
.../hadoop/yarn/client/api/AMRMClient.java | 132 +++++++++++-
.../yarn/client/api/async/AMRMClientAsync.java | 18 ++
.../yarn/client/api/impl/AMRMClientImpl.java | 163 ++++++++++-----
.../client/api/impl/RemoteRequestsTable.java | 11 +-
.../yarn/client/api/impl/TestAMRMClient.java | 200 ++++++++++++++++---
.../impl/TestAMRMClientContainerRequest.java | 7 +-
.../api/impl/TestDistributedScheduling.java | 24 ++-
.../yarn/client/api/impl/TestNMClient.java | 6 +-
9 files changed, 468 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 07f132c..2d6f0f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -112,6 +112,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
// Compare priority, host and capability
int ret = r1.getPriority().compareTo(r2.getPriority());
if (ret == 0) {
+ ret = Long.compare(
+ r1.getAllocationRequestId(), r2.getAllocationRequestId());
+ }
+ if (ret == 0) {
String h1 = r1.getResourceName();
String h2 = r2.getResourceName();
ret = h1.compareTo(h2);
@@ -381,6 +385,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + getNumContainers();
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
+ result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
return result;
}
@@ -422,6 +427,11 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
.equals(other.getExecutionTypeRequest().getExecutionType())) {
return false;
}
+
+ if (getAllocationRequestId() != other.getAllocationRequestId()) {
+ return false;
+ }
+
if (getNodeLabelExpression() == null) {
if (other.getNodeLabelExpression() != null) {
return false;
@@ -452,7 +462,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
int capabilityComparison =
this.getCapability().compareTo(other.getCapability());
if (capabilityComparison == 0) {
- return this.getNumContainers() - other.getNumContainers();
+ int numContainerComparison =
+ this.getNumContainers() - other.getNumContainers();
+ if (numContainerComparison == 0) {
+ return Long.compare(getAllocationRequestId(),
+ other.getAllocationRequestId());
+ } else {
+ return numContainerComparison;
+ }
} else {
return capabilityComparison;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 79d587a..2990c05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -110,6 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final List<String> nodes;
final List<String> racks;
final Priority priority;
+ final long allocationRequestId;
final boolean relaxLocality;
final String nodeLabelsExpression;
final ExecutionTypeRequest executionTypeRequest;
@@ -134,6 +135,31 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
String[] racks, Priority priority) {
this(capability, nodes, racks, priority, true, null);
}
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints and
+ * locality relaxation enabled.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param allocationRequestId Allocation Request Id
+ */
+ @Public
+ @InterfaceStability.Evolving
+ public ContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority, long allocationRequestId) {
+ this(capability, nodes, racks, priority, allocationRequestId, true, null,
+ ExecutionTypeRequest.newInstance());
+ }
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@@ -175,16 +201,79 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
+ * @param allocationRequestId Allocation Request Id
+ */
+ @Public
+ @InterfaceStability.Evolving
+ public ContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority, long allocationRequestId,
+ boolean relaxLocality) {
+ this(capability, nodes, racks, priority, allocationRequestId,
+ relaxLocality, null, ExecutionTypeRequest.newInstance());
+ }
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param relaxLocality
+ * If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
- this(capability, nodes, racks, priority, relaxLocality,
+ this(capability, nodes, racks, priority, 0, relaxLocality,
nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
}
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param allocationRequestId
+ * The allocationRequestId of the request. To be used as a tracking
+ * id to match Containers allocated against this request. Will
+ * default to 0 if not specified.
+ * @param relaxLocality
+ * If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
+ * @param nodeLabelsExpression
+ * Set node labels to allocate resource, now we only support
+ * asking for only a single node label
+ */
+ @Public
+ @InterfaceStability.Evolving
+ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
+ Priority priority, long allocationRequestId, boolean relaxLocality,
+ String nodeLabelsExpression) {
+ this(capability, nodes, racks, priority, allocationRequestId,
+ relaxLocality, nodeLabelsExpression,
+ ExecutionTypeRequest.newInstance());
+ }
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@@ -200,6 +289,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
+ * @param allocationRequestId
+ * The allocationRequestId of the request. To be used as a tracking
+ * id to match Containers allocated against this request. Will
+ * default to 0 if not specified.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
@@ -210,7 +303,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* Set the execution type of the container request.
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
- Priority priority, boolean relaxLocality, String nodeLabelsExpression,
+ Priority priority, long allocationRequestId, boolean relaxLocality,
+ String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
@@ -223,6 +317,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
&& (nodes == null || nodes.length == 0)),
"Can't turn off locality relaxation on a " +
"request with no location constraints");
+ this.allocationRequestId = allocationRequestId;
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
@@ -247,6 +342,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
public Priority getPriority() {
return priority;
}
+
+ public long getAllocationRequestId() {
+ return allocationRequestId;
+ }
public boolean getRelaxLocality() {
return relaxLocality;
@@ -264,6 +363,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
+ sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
.append("]");
return sb.toString();
@@ -390,6 +490,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
+ *
+ * NOTE: This API only matches Container requests that were created by the
+ * client WITHOUT the allocationRequestId being set.
+ *
* @return Collection of request matching the parameters
*/
@InterfaceStability.Evolving
@@ -407,7 +511,11 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
- * specify an <code>ExecutionType</code> .
+ * specify an <code>ExecutionType</code>.
+ *
+ * NOTE: This API only matches Container requests that were created by the
+ * client WITHOUT the allocationRequestId being set.
+ *
* @param priority Priority
* @param resourceName Location
* @param executionType ExecutionType
@@ -421,7 +529,23 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
throw new UnsupportedOperationException("The sub-class extending" +
" AMRMClient is expected to implement this !!");
}
-
+
+ /**
+ * Get outstanding <code>ContainerRequest</code>s matching the given
+ * allocationRequestId. These ContainerRequests should have been added via
+ * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+ * the AMRMClient may return its internal collection directly without creating
+ * a copy. Users should not perform mutable operations on the return value.
+ *
+ * NOTE: This API only matches Container requests that were created by the
+ * client WITH the allocationRequestId being set to a non-default value.
+ *
+ * @param allocationRequestId Allocation Request Id
+ * @return Collection of request matching the parameters
+ */
+ @InterfaceStability.Evolving
+ public abstract Collection<T> getMatchingRequests(long allocationRequestId);
+
/**
* Update application's blacklist with addition or removal resources.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index d06fadc..10d2a2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -202,6 +202,10 @@ extends AbstractService {
/**
* Returns all matching ContainerRequests that match the given Priority,
* ResourceName, ExecutionType and Capability.
+ *
+ * NOTE: This matches only requests that were made by the client WITHOUT the
+ * allocationRequestId specified.
+ *
* @param priority Priority.
* @param resourceName Location.
* @param executionType ExecutionType.
@@ -214,6 +218,20 @@ extends AbstractService {
return client.getMatchingRequests(priority, resourceName,
executionType, capability);
}
+
+ /**
+ * Returns all matching ContainerRequests that match the given
+ * AllocationRequestId.
+ *
+ * NOTE: This matches only requests that were made by the client WITH the
+ * allocationRequestId specified.
+ *
+ * @param allocationRequestId AllocationRequestId.
+ * @return All matching ContainerRequests
+ */
+ public Collection<T> getMatchingRequests(long allocationRequestId) {
+ return client.getMatchingRequests(allocationRequestId);
+ }
/**
* Registers this application master with the resource manager. On successful
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4145944..60834f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -108,10 +108,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
- ResourceRequestInfo(Priority priority, String resourceName,
- Resource capability, boolean relaxLocality) {
+ ResourceRequestInfo(Long allocationRequestId, Priority priority,
+ String resourceName, Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
capability, 0);
+ remoteRequest.setAllocationRequestId(allocationRequestId);
remoteRequest.setRelaxLocality(relaxLocality);
containerRequests = new LinkedHashSet<T>();
}
@@ -154,7 +155,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return (mem0 <= mem1 && cpu0 <= cpu1);
}
- final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
+ private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
+ new HashMap<>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -263,10 +265,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
- askList.add(ResourceRequest.newInstance(r.getPriority(),
+ ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression(),
- r.getExecutionTypeRequest()));
+ r.getExecutionTypeRequest());
+ rr.setAllocationRequestId(r.getAllocationRequestId());
+ askList.add(rr);
}
List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -318,11 +322,14 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
- @SuppressWarnings("unchecked")
- Iterator<ResourceRequestInfo<T>> reqIter =
- remoteRequestsTable.iterator();
- while (reqIter.hasNext()) {
- addResourceRequestToAsk(reqIter.next().remoteRequest);
+ for (RemoteRequestsTable remoteRequestsTable :
+ remoteRequests.values()) {
+ @SuppressWarnings("unchecked")
+ Iterator<ResourceRequestInfo<T>> reqIter =
+ remoteRequestsTable.iterator();
+ while (reqIter.hasNext()) {
+ addResourceRequestToAsk(reqIter.next().remoteRequest);
+ }
}
change.putAll(this.pendingChange);
}
@@ -498,15 +505,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// check that specific and non-specific requests cannot be mixed within a
// priority
- checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
- req.getRelaxLocality());
+ checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+ req.getPriority(), ANY_LIST, req.getRelaxLocality());
// check that specific rack cannot be mixed with specific node within a
// priority. If node and its rack are both specified then they must be
// in the same request.
// For explicitly requested racks, we set locality relaxation to true
- checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
- checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
- req.getRelaxLocality());
+ checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+ req.getPriority(), dedupedRacks, true);
+ checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+ req.getPriority(), inferredRacks, req.getRelaxLocality());
// check if the node label expression specified is valid
checkNodeLabelExpression(req);
@@ -607,6 +615,24 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return clusterNodeCount;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection<T> getMatchingRequests(long allocationRequestId) {
+ RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId);
+ LinkedHashSet<T> list = new LinkedHashSet<>();
+
+ if (remoteRequestsTable != null) {
+ Iterator<ResourceRequestInfo<T>> reqIter =
+ remoteRequestsTable.iterator();
+ while (reqIter.hasNext()) {
+ ResourceRequestInfo<T> resReqInfo = reqIter.next();
+ list.addAll(resReqInfo.containerRequests);
+ }
+ }
+ return list;
+ }
+
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority,
@@ -617,6 +643,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
@Override
+ @SuppressWarnings("unchecked")
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
@@ -626,9 +653,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
"The priority at which to request containers should not be null ");
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
- @SuppressWarnings("unchecked")
+ RemoteRequestsTable remoteRequestsTable = getTable(0);
List<ResourceRequestInfo<T>> matchingRequests =
- this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
+ remoteRequestsTable.getMatchingRequests(priority, resourceName,
executionType, capability);
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
@@ -664,23 +691,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
- private void checkLocalityRelaxationConflict(Priority priority,
- Collection<String> locations, boolean relaxLocality) {
+ private void checkLocalityRelaxationConflict(Long allocationReqId,
+ Priority priority, Collection<String> locations, boolean relaxLocality) {
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
- @SuppressWarnings("unchecked")
- List<ResourceRequestInfo> allCapabilityMaps =
- remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
- for (ResourceRequestInfo reqs : allCapabilityMaps) {
- ResourceRequest remoteRequest = reqs.remoteRequest;
- boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
- if (relaxLocality != existingRelaxLocality) {
- throw new InvalidContainerRequestException("Cannot submit a "
- + "ContainerRequest asking for location "
- + remoteRequest.getResourceName() + " with locality relaxation "
- + relaxLocality + " when it has already been requested"
- + "with locality relaxation " + existingRelaxLocality);
+ RemoteRequestsTable<T> remoteRequestsTable = getTable(allocationReqId);
+ if (remoteRequestsTable != null) {
+ @SuppressWarnings("unchecked")
+ List<ResourceRequestInfo> allCapabilityMaps =
+ remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
+ for (ResourceRequestInfo reqs : allCapabilityMaps) {
+ ResourceRequest remoteRequest = reqs.remoteRequest;
+ boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
+ if (relaxLocality != existingRelaxLocality) {
+ throw new InvalidContainerRequestException("Cannot submit a "
+ + "ContainerRequest asking for location "
+ + remoteRequest.getResourceName() + " with locality relaxation "
+ + relaxLocality + " when it has already been requested"
+ + "with locality relaxation " + existingRelaxLocality);
+ }
}
}
}
@@ -742,10 +772,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private void addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
+ RemoteRequestsTable<T> remoteRequestsTable =
+ getTable(req.getAllocationRequestId());
+ if (remoteRequestsTable == null) {
+ remoteRequestsTable = new RemoteRequestsTable<T>();
+ putTable(req.getAllocationRequestId(), remoteRequestsTable);
+ }
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
- .addResourceRequest(priority, resourceName,
- execTypeReq, capability, req, relaxLocality, labelExpression);
+ .addResourceRequest(req.getAllocationRequestId(), priority,
+ resourceName, execTypeReq, capability, req, relaxLocality,
+ labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@@ -761,29 +798,37 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
private void decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
- @SuppressWarnings("unchecked")
- ResourceRequestInfo resourceRequestInfo =
- remoteRequestsTable.decResourceRequest(priority, resourceName,
- execTypeReq, capability, req);
- // send the ResourceRequest to RM even if is 0 because it needs to override
- // a previously sent value. If ResourceRequest was not sent previously then
- // sending 0 aught to be a no-op on RM
- if (resourceRequestInfo != null) {
- addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
-
- // delete entry from map if no longer needed
- if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
- this.remoteRequestsTable.remove(priority, resourceName,
- execTypeReq.getExecutionType(), capability);
- }
+ RemoteRequestsTable<T> remoteRequestsTable =
+ getTable(req.getAllocationRequestId());
+ if (remoteRequestsTable != null) {
+ @SuppressWarnings("unchecked")
+ ResourceRequestInfo resourceRequestInfo =
+ remoteRequestsTable.decResourceRequest(priority, resourceName,
+ execTypeReq, capability, req);
+ // send the ResourceRequest to RM even if is 0 because it needs to
+ // override a previously sent value. If ResourceRequest was not sent
+ // previously then sending 0 ought to be a no-op on RM
+ if (resourceRequestInfo != null) {
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+ // delete entry from map if no longer needed
+ if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+ remoteRequestsTable.remove(priority, resourceName,
+ execTypeReq.getExecutionType(), capability);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("AFTER decResourceRequest:" + " applicationId="
- + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + resourceRequestInfo.remoteRequest.getNumContainers()
- + " #asks=" + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AFTER decResourceRequest:"
+ + " allocationRequestId=" + req.getAllocationRequestId()
+ + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
+ }
}
+ } else {
+ LOG.info("No remoteRequestTable found with allocationRequestId="
+ + req.getAllocationRequestId());
}
}
@@ -829,4 +874,14 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
currentUGI.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
}
+
+ @VisibleForTesting
+ RemoteRequestsTable<T> getTable(long allocationRequestId) {
+ return remoteRequests.get(Long.valueOf(allocationRequestId));
+ }
+
+ RemoteRequestsTable<T> putTable(long allocationRequestId,
+ RemoteRequestsTable<T> table) {
+ return remoteRequests.put(Long.valueOf(allocationRequestId), table);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
index 853a512..110ca79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -264,15 +264,16 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
}
@SuppressWarnings("unchecked")
- ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
- ExecutionTypeRequest execTypeReq, Resource capability, T req,
- boolean relaxLocality, String labelExpression) {
+ ResourceRequestInfo addResourceRequest(Long allocationRequestId,
+ Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
+ Resource capability, T req, boolean relaxLocality,
+ String labelExpression) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
- new ResourceRequestInfo(priority, resourceName, capability,
- relaxLocality);
+ new ResourceRequestInfo(allocationRequestId, priority, resourceName,
+ capability, relaxLocality);
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
resourceRequestInfo);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 1eeeb78..e0ad2c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -268,6 +269,18 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6);
amClient.addContainerRequest(storedContainer7);
+
+ // Add some CRs with allocReqIds... These will not be returned by
+ // the default getMatchingRequests
+ ContainerRequest storedContainer11 =
+ new ContainerRequest(capability1, nodes, racks, priority, 1);
+ ContainerRequest storedContainer33 =
+ new ContainerRequest(capability3, nodes, racks, priority, 3);
+ ContainerRequest storedContainer43 =
+ new ContainerRequest(capability4, nodes, racks, priority, 3);
+ amClient.addContainerRequest(storedContainer11);
+ amClient.addContainerRequest(storedContainer33);
+ amClient.addContainerRequest(storedContainer43);
// test matching of containers
List<? extends Collection<ContainerRequest>> matches;
@@ -279,6 +292,25 @@ public class TestAMRMClient {
storedRequest = matches.get(0).iterator().next();
assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
+
+ // exact match for allocReqId 1
+ Collection<ContainerRequest> reqIdMatches =
+ amClient.getMatchingRequests(1);
+ assertEquals(1, reqIdMatches.size());
+ storedRequest = reqIdMatches.iterator().next();
+ assertEquals(storedContainer11, storedRequest);
+ amClient.removeContainerRequest(storedContainer11);
+
+ // exact match for allocReqId 3
+ reqIdMatches = amClient.getMatchingRequests(3);
+ assertEquals(2, reqIdMatches.size());
+ Iterator<ContainerRequest> iter = reqIdMatches.iterator();
+ storedRequest = iter.next();
+ assertEquals(storedContainer43, storedRequest);
+ amClient.removeContainerRequest(storedContainer43);
+ storedRequest = iter.next();
+ assertEquals(storedContainer33, storedRequest);
+ amClient.removeContainerRequest(storedContainer33);
// exact matching with order maintained
Resource testCapability2 = Resource.newInstance(2000, 1);
@@ -364,26 +396,32 @@ public class TestAMRMClient {
ContainerRequest storedGuarContainer2 =
new ContainerRequest(capability2, nodes, racks, priority);
ContainerRequest storedOpportContainer1 =
- new ContainerRequest(capability1, nodes, racks, priority, true, null,
+ new ContainerRequest(capability1, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer2 =
- new ContainerRequest(capability2, nodes, racks, priority, true, null,
+ new ContainerRequest(capability2, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer3 =
- new ContainerRequest(capability3, nodes, racks, priority, true, null,
+ new ContainerRequest(capability3, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer4 =
- new ContainerRequest(capability4, nodes, racks, priority, true, null,
+ new ContainerRequest(capability4, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer5 =
- new ContainerRequest(capability5, nodes, racks, priority, true, null,
+ new ContainerRequest(capability5, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer6 =
- new ContainerRequest(capability6, nodes, racks, priority, true, null,
+ new ContainerRequest(capability6, nodes, racks, priority,
+ 0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer7 =
new ContainerRequest(capability7, nodes, racks, priority2,
- false, null,
+ 0, false, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
amClient.addContainerRequest(storedGuarContainer1);
amClient.addContainerRequest(storedGuarContainer2);
@@ -541,11 +579,13 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer3);
// test addition and storage
- int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+ RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+ amClient.getTable(0);
+ int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
- containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
+ containersRequestedAny = remoteRequestsTable.get(priority1,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
@@ -584,7 +624,7 @@ public class TestAMRMClient {
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
assertTrue(matches.isEmpty());
// 0 requests left. everything got cleaned up
- assertTrue(amClient.remoteRequestsTable.isEmpty());
+ assertTrue(amClient.getTable(0).isEmpty());
// go through an exemplary allocation, matching and release cycle
amClient.addContainerRequest(storedContainer1);
@@ -628,7 +668,7 @@ public class TestAMRMClient {
assertEquals(0, amClient.ask.size());
assertEquals(0, allocResponse.getAllocatedContainers().size());
// 0 requests left. everything got cleaned up
- assertTrue(amClient.remoteRequestsTable.isEmpty());
+ assertTrue(remoteRequestsTable.isEmpty());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
@@ -780,6 +820,16 @@ public class TestAMRMClient {
@Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException {
+ initAMRMClientAndTest(false);
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientAllocReqId() throws YarnException, IOException {
+ initAMRMClientAndTest(true);
+ }
+
+ private void initAMRMClientAndTest(boolean useAllocReqId)
+ throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null;
try {
// start am rm client
@@ -796,7 +846,11 @@ public class TestAMRMClient {
amClient.registerApplicationMaster("Host", 10000, "");
- testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
+ if (useAllocReqId) {
+ testAllocRequestId((AMRMClientImpl<ContainerRequest>)amClient);
+ } else {
+ testAllocation((AMRMClientImpl<ContainerRequest>) amClient);
+ }
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
@@ -1055,22 +1109,9 @@ public class TestAMRMClient {
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
-
- int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
- node, ExecutionType.GUARANTEED, capability).remoteRequest
- .getNumContainers();
- int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
- rack, ExecutionType.GUARANTEED, capability).remoteRequest
- .getNumContainers();
- int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
- .remoteRequest.getNumContainers();
- assertEquals(2, containersRequestedNode);
- assertEquals(2, containersRequestedRack);
- assertEquals(2, containersRequestedAny);
- assertEquals(3, amClient.ask.size());
- assertEquals(0, amClient.release.size());
+ assertNumContainers(amClient, 0, 2, 2, 2, 3, 0);
+ int containersRequestedAny = 2;
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
@@ -1163,10 +1204,15 @@ public class TestAMRMClient {
// verify that the remove request made in between makeRequest and allocate
// has not been lost
assertEquals(0, snoopRequest.getNumContainers());
-
- iterationsLeft = 3;
+
+ waitForContainerCompletion(3, amClient, releases);
+ }
+
+ private void waitForContainerCompletion(int numIterations,
+ AMRMClientImpl<ContainerRequest> amClient, Set<ContainerId> releases)
+ throws YarnException, IOException {
// do a few iterations to ensure RM is not going send new containers
- while(!releases.isEmpty() || iterationsLeft-- > 0) {
+ while(!releases.isEmpty() || numIterations-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
@@ -1181,7 +1227,7 @@ public class TestAMRMClient {
}
}
}
- if(iterationsLeft > 0) {
+ if(numIterations > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
@@ -1190,6 +1236,98 @@ public class TestAMRMClient {
assertEquals(0, amClient.release.size());
}
+ private void testAllocRequestId(
+ final AMRMClientImpl<ContainerRequest> amClient) throws YarnException,
+ IOException {
+ // setup container request
+
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 1));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 1));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 1));
+ amClient.removeContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority, 2));
+
+ assertNumContainers(amClient, 0, 1, 1, 1, 9, 0);
+ assertNumContainers(amClient, 1, 1, 1, 1, 9, 0);
+ assertNumContainers(amClient, 2, 1, 1, 1, 9, 0);
+ int containersRequestedAny = 3;
+
+ // RM should allocate container within 2 calls to allocate()
+ List<Container> allocatedContainers = new ArrayList<>();
+ int iterationsLeft = 5;
+ Set<ContainerId> releases = new TreeSet<ContainerId>();
+
+ while (allocatedContainers.size() < containersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+
+ if(allocatedContainers.size() < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(containersRequestedAny, allocatedContainers.size());
+ Set<Long> expAllocIds = new HashSet<>(
+ Arrays.asList(Long.valueOf(0), Long.valueOf(1), Long.valueOf(2)));
+ Set<Long> actAllocIds = new HashSet<>();
+ for (Container ac : allocatedContainers) {
+ actAllocIds.add(Long.valueOf(ac.getAllocationRequestId()));
+ }
+ assertEquals(expAllocIds, actAllocIds);
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ waitForContainerCompletion(3, amClient, releases);
+ }
+
+ private void assertNumContainers(AMRMClientImpl<ContainerRequest> amClient,
+ long allocationReqId, int expNode, int expRack, int expAny,
+ int expAsks, int expRelease) {
+ RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+ amClient.getTable(allocationReqId);
+ int containersRequestedNode = remoteRequestsTable.get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = remoteRequestsTable.get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = remoteRequestsTable.get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
+
+ assertEquals(expNode, containersRequestedNode);
+ assertEquals(expRack, containersRequestedRack);
+ assertEquals(expAny, containersRequestedAny);
+ assertEquals(expAsks, amClient.ask.size());
+ assertEquals(expRelease, amClient.release.size());
+ }
+
class CountDownSupplier implements Supplier<Boolean> {
int counter = 0;
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index 2db33c1..ad18da3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -61,7 +61,7 @@ public class TestAMRMClientContainerRequest {
verifyResourceRequest(client, request, ResourceRequest.ANY, true);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
- new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+ new String[] {"/rack2"}, Priority.newInstance(1), 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
client.addContainerRequest(request2);
@@ -274,8 +274,9 @@ public class TestAMRMClientContainerRequest {
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality,
ExecutionType executionType) {
- ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
- location, executionType, request.getCapability()).remoteRequest;
+ ResourceRequest ask = client.getTable(0)
+ .get(request.getPriority(), location, executionType,
+ request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 40390d4..4cfc4eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -366,12 +366,12 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
- true, null,
+ 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
- true, null,
+ 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
@@ -381,21 +381,23 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
- true, null,
+ 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
- int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+ RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+ amClient.getTable(0);
+ int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+ int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+ int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
- amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
+ remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@@ -457,7 +459,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
- true, null,
+ 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
@@ -469,7 +471,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
- true, null,
+ 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
@@ -490,7 +492,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null,
- priority2, true, null,
+ priority2, 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
throw new Exception();
@@ -571,7 +573,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
ExecutionTypeRequest execTypeRequest =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
- capability, nodes, racks, priority, true, null, execTypeRequest);
+ capability, nodes, racks, priority, 0, true, null, execTypeRequest);
amClient.addContainerRequest(containerRequest);
// Wait until the container is allocated
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 969fb70..3640883 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -252,9 +252,9 @@ public class TestNMClient {
racks, priority));
}
- int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
- .remoteRequest.getNumContainers();
+ int containersRequestedAny = rmClient.getTable(0)
+ .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
+ capability).remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org