You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/16 18:32:43 UTC
[19/49] hadoop git commit: YARN-5124. Modify AMRMClient to set the
ExecutionType in the ResourceRequest. (asuresh)
YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/51432779
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/51432779
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/51432779
Branch: refs/heads/HDFS-7240
Commit: 51432779588fdd741b4840601f5db637ec783d92
Parents: 5279af7
Author: Arun Suresh <as...@apache.org>
Authored: Sun Jun 12 09:42:38 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Sun Jun 12 09:42:38 2016 -0700
----------------------------------------------------------------------
.../hadoop/yarn/client/api/AMRMClient.java | 43 +-
.../yarn/client/api/async/AMRMClientAsync.java | 17 +
.../yarn/client/api/impl/AMRMClientImpl.java | 294 +++------
.../client/api/impl/RemoteRequestsTable.java | 332 ++++++++++
.../client/api/impl/BaseAMRMProxyE2ETest.java | 197 ++++++
.../yarn/client/api/impl/TestAMRMClient.java | 26 +-
.../impl/TestAMRMClientContainerRequest.java | 54 +-
.../yarn/client/api/impl/TestAMRMProxy.java | 171 +----
.../api/impl/TestDistributedScheduling.java | 644 ++++++++++++-------
.../yarn/client/api/impl/TestNMClient.java | 7 +-
.../records/impl/pb/ResourceRequestPBImpl.java | 5 +-
11 files changed, 1204 insertions(+), 586 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 3ec0899..5f362c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -109,7 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final Priority priority;
final boolean relaxLocality;
final String nodeLabelsExpression;
- final ExecutionType executionType;
+ final ExecutionTypeRequest executionTypeRequest;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@@ -180,7 +181,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, relaxLocality,
nodeLabelsExpression,
- ExecutionType.GUARANTEED);
+ ExecutionTypeRequest.newInstance());
}
/**
@@ -203,12 +204,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
- * @param executionType
+ * @param executionTypeRequest
* Set the execution type of the container request.
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression,
- ExecutionType executionType) {
+ ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
@@ -226,7 +227,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.priority = priority;
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
- this.executionType = executionType;
+ this.executionTypeRequest = executionTypeRequest;
}
public Resource getCapability() {
@@ -253,15 +254,16 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return nodeLabelsExpression;
}
- public ExecutionType getExecutionType() {
- return executionType;
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ return executionTypeRequest;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
- sb.append("ExecutionType[").append(executionType).append("]");
+ sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
+ .append("]");
return sb.toString();
}
}
@@ -388,10 +390,35 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* collection, requests will be returned in the same order as they were added.
* @return Collection of request matching the parameters
*/
+ @InterfaceStability.Evolving
public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
+
+ /**
+ * Get outstanding <code>ContainerRequest</code>s matching the given
+ * parameters. These ContainerRequests should have been added via
+ * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+ * the AMRMClient may return its internal collection directly without creating
+ * a copy. Users should not perform mutable operations on the return value.
+ * Each collection in the list contains requests with identical
+ * <code>Resource</code> size that fit in the given capability. In a
+ * collection, requests will be returned in the same order as they were added.
+ * specify an <code>ExecutionType</code> .
+ * @param priority Priority
+ * @param resourceName Location
+ * @param executionType ExecutionType
+ * @param capability Capability
+ * @return Collection of request matching the parameters
+ */
+ @InterfaceStability.Evolving
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability) {
+ throw new UnsupportedOperationException("The sub-class extending" +
+ " AMRMClient is expected to implement this !!");
+ }
/**
* Update application's blacklist with addition or removal resources.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 3c8f923..2f95156 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -196,6 +197,22 @@ extends AbstractService {
Priority priority,
String resourceName,
Resource capability);
+
+ /**
+ * Returns all matching ContainerRequests that match the given Priority,
+ * ResourceName, ExecutionType and Capability.
+ * @param priority Priority.
+ * @param resourceName Location.
+ * @param executionType ExecutionType.
+ * @param capability Capability.
+ * @return All matching ContainerRequests
+ */
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability) {
+ return client.getMatchingRequests(priority, resourceName,
+ executionType, capability);
+ }
/**
* Registers this application master with the resource manager. On successful
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4366c25..4145944 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -19,19 +19,19 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.AbstractMap.SimpleEntry;
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -102,7 +104,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
- class ResourceRequestInfo {
+ static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
@@ -115,11 +117,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
-
/**
* Class compares Resource by memory then cpu in reverse order
*/
- class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+ static class ResourceReverseMemoryThenCpuComparator implements
+ Comparator<Resource>, Serializable {
+ static final long serialVersionUID = 12345L;
@Override
public int compare(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
@@ -141,7 +144,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return -1;
}
}
-
+
static boolean canFit(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
@@ -150,17 +153,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return (mem0 <= mem1 && cpu0 <= cpu1);
}
-
- //Key -> Priority
- //Value -> Map
- //Key->ResourceName (e.g., nodename, rackname, *)
- //Value->Map
- //Key->Resource Capability
- //Value->ResourceRequest
- protected final
- Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
- remoteRequestsTable =
- new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
+
+ final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -185,6 +179,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
super(AMRMClientImpl.class.getName());
}
+ @VisibleForTesting
+ AMRMClientImpl(ApplicationMasterProtocol protocol) {
+ super(AMRMClientImpl.class.getName());
+ this.rmClient = protocol;
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
@@ -195,8 +195,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
- rmClient =
- ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
+ if (rmClient == null) {
+ rmClient = ClientRMProxy.createRMProxy(
+ conf, ApplicationMasterProtocol.class);
+ }
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@@ -263,7 +265,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
- r.getRelaxLocality(), r.getNodeLabelExpression()));
+ r.getRelaxLocality(), r.getNodeLabelExpression(),
+ r.getExecutionTypeRequest()));
}
List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -315,13 +318,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
- for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
- .values()) {
- for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
- for (ResourceRequestInfo request : capabalities.values()) {
- addResourceRequestToAsk(request.remoteRequest);
- }
- }
+ @SuppressWarnings("unchecked")
+ Iterator<ResourceRequestInfo<T>> reqIter =
+ remoteRequestsTable.iterator();
+ while (reqIter.hasNext()) {
+ addResourceRequestToAsk(reqIter.next().remoteRequest);
}
change.putAll(this.pendingChange);
}
@@ -517,26 +518,28 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
- addResourceRequest(req.getPriority(), node, req.getCapability(), req,
- true, req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), node,
+ req.getExecutionTypeRequest(), req.getCapability(), req, true,
+ req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
- addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
- true, req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+ req.getCapability(), req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
- addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
- req.getRelaxLocality(), req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+ req.getCapability(), req, req.getRelaxLocality(),
+ req.getNodeLabelExpression());
}
-
// Off-switch
- addResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), ResourceRequest.ANY,
+ req.getExecutionTypeRequest(), req.getCapability(), req,
+ req.getRelaxLocality(), req.getNodeLabelExpression());
}
@Override
@@ -552,16 +555,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// Update resource requests
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
- decResourceRequest(req.getPriority(), node, req.getCapability(), req);
+ decResourceRequest(req.getPriority(), node,
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
}
for (String rack : allRacks) {
- decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
+ decResourceRequest(req.getPriority(), rack,
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getCapability(), req);
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
@Override
@@ -601,47 +606,38 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
public synchronized int getClusterNodeCount() {
return clusterNodeCount;
}
-
+
+ @Override
+ public synchronized List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ return getMatchingRequests(priority, resourceName,
+ ExecutionType.GUARANTEED, capability);
+ }
+
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
- Priority priority,
- String resourceName,
- Resource capability) {
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
- Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- return list;
- }
- TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
- .get(resourceName);
- if (reqMap == null) {
- return list;
- }
- ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
- if (resourceRequestInfo != null &&
- !resourceRequestInfo.containerRequests.isEmpty()) {
- list.add(resourceRequestInfo.containerRequests);
- return list;
- }
-
- // no exact match. Container may be larger than what was requested.
- // get all resources <= capability. map is reverse sorted.
- SortedMap<Resource, ResourceRequestInfo> tailMap =
- reqMap.tailMap(capability);
- for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
- if (canFit(entry.getKey(), capability) &&
- !entry.getValue().containerRequests.isEmpty()) {
- // match found that fits in the larger resource
- list.add(entry.getValue().containerRequests);
+ @SuppressWarnings("unchecked")
+ List<ResourceRequestInfo<T>> matchingRequests =
+ this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
+ executionType, capability);
+ // If no exact match. Container may be larger than what was requested.
+ // get all resources <= capability. map is reverse sorted.
+ for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
+ if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
+ !resReqInfo.containerRequests.isEmpty()) {
+ list.add(resReqInfo.containerRequests);
}
}
-
// no match found
return list;
}
@@ -663,34 +659,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return racks;
}
-
+
/**
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
private void checkLocalityRelaxationConflict(Priority priority,
Collection<String> locations, boolean relaxLocality) {
- Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- return;
- }
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
- for (String location : locations) {
- TreeMap<Resource, ResourceRequestInfo> reqs =
- remoteRequests.get(location);
- if (reqs != null && !reqs.isEmpty()) {
- boolean existingRelaxLocality =
- reqs.values().iterator().next().remoteRequest.getRelaxLocality();
- if (relaxLocality != existingRelaxLocality) {
- throw new InvalidContainerRequestException("Cannot submit a "
- + "ContainerRequest asking for location " + location
- + " with locality relaxation " + relaxLocality + " when it has "
- + "already been requested with locality relaxation " + existingRelaxLocality);
- }
- }
+
+ @SuppressWarnings("unchecked")
+ List<ResourceRequestInfo> allCapabilityMaps =
+ remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
+ for (ResourceRequestInfo reqs : allCapabilityMaps) {
+ ResourceRequest remoteRequest = reqs.remoteRequest;
+ boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
+ if (relaxLocality != existingRelaxLocality) {
+ throw new InvalidContainerRequestException("Cannot submit a "
+ + "ContainerRequest asking for location "
+ + remoteRequest.getResourceName() + " with locality relaxation "
+ + relaxLocality + " when it has already been requested"
+ + "with locality relaxation " + existingRelaxLocality);
}
+ }
}
/**
@@ -747,46 +739,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ask.add(remoteRequest);
}
- private void
- addResourceRequest(Priority priority, String resourceName,
- Resource capability, T req, boolean relaxLocality,
- String labelExpression) {
- Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
- this.remoteRequestsTable.get(priority);
- if (remoteRequests == null) {
- remoteRequests =
- new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
- this.remoteRequestsTable.put(priority, remoteRequests);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added priority=" + priority);
- }
- }
- TreeMap<Resource, ResourceRequestInfo> reqMap =
- remoteRequests.get(resourceName);
- if (reqMap == null) {
- // capabilities are stored in reverse sorted order. smallest last.
- reqMap = new TreeMap<Resource, ResourceRequestInfo>(
- new ResourceReverseMemoryThenCpuComparator());
- remoteRequests.put(resourceName, reqMap);
- }
- ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
- if (resourceRequestInfo == null) {
- resourceRequestInfo =
- new ResourceRequestInfo(priority, resourceName, capability,
- relaxLocality);
- reqMap.put(capability, resourceRequestInfo);
- }
-
- resourceRequestInfo.remoteRequest.setNumContainers(
- resourceRequestInfo.remoteRequest.getNumContainers() + 1);
-
- if (relaxLocality) {
- resourceRequestInfo.containerRequests.add(req);
- }
-
- if (ResourceRequest.ANY.equals(resourceName)) {
- resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
- }
+ private void addResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req,
+ boolean relaxLocality, String labelExpression) {
+ @SuppressWarnings("unchecked")
+ ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
+ .addResourceRequest(priority, resourceName,
+ execTypeReq, capability, req, relaxLocality, labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@@ -800,70 +759,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
- private void decResourceRequest(Priority priority,
- String resourceName,
- Resource capability,
- T req) {
- Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
- this.remoteRequestsTable.get(priority);
-
- if(remoteRequests == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as priority " + priority
- + " is not present in request table");
- }
- return;
- }
-
- Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not decrementing resource as " + resourceName
- + " is not present in request table");
- }
- return;
- }
- ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("BEFORE decResourceRequest:" + " applicationId="
- + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + resourceRequestInfo.remoteRequest.getNumContainers()
- + " #asks=" + ask.size());
- }
-
- resourceRequestInfo.remoteRequest.setNumContainers(
- resourceRequestInfo.remoteRequest.getNumContainers() - 1);
-
- resourceRequestInfo.containerRequests.remove(req);
-
- if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
- // guard against spurious removals
- resourceRequestInfo.remoteRequest.setNumContainers(0);
- }
+ private void decResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+ @SuppressWarnings("unchecked")
+ ResourceRequestInfo resourceRequestInfo =
+ remoteRequestsTable.decResourceRequest(priority, resourceName,
+ execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to override
// a previously sent value. If ResourceRequest was not sent previously then
// sending 0 aught to be a no-op on RM
- addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+ if (resourceRequestInfo != null) {
+ addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
- // delete entries from map if no longer needed
- if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
- reqMap.remove(capability);
- if (reqMap.size() == 0) {
- remoteRequests.remove(resourceName);
+ // delete entry from map if no longer needed
+ if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+ this.remoteRequestsTable.remove(priority, resourceName,
+ execTypeReq.getExecutionType(), capability);
}
- if (remoteRequests.size() == 0) {
- remoteRequestsTable.remove(priority);
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("AFTER decResourceRequest:" + " applicationId="
- + " priority=" + priority.getPriority()
- + " resourceName=" + resourceName + " numContainers="
- + resourceRequestInfo.remoteRequest.getNumContainers()
- + " #asks=" + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AFTER decResourceRequest:" + " applicationId="
+ + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers()
+ + " #asks=" + ask.size());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
new file mode 100644
index 0000000..853a512
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+
+class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
+
+ private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
+
+ static ResourceReverseMemoryThenCpuComparator resourceComparator =
+ new ResourceReverseMemoryThenCpuComparator();
+
+ /**
+ * Nested Iterator that iterates over just the ResourceRequestInfo
+ * object.
+ */
+ class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
+ private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>>> iLocMap;
+ private Iterator<Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>> iExecTypeMap;
+ private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+ private Iterator<ResourceRequestInfo> iResReqInfo;
+
+ public RequestInfoIterator(Iterator<Map<String,
+ Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+ iLocationMap) {
+ this.iLocMap = iLocationMap;
+ if (iLocMap.hasNext()) {
+ iExecTypeMap = iLocMap.next().values().iterator();
+ } else {
+ iExecTypeMap =
+ new LinkedList<Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>>().iterator();
+ }
+ if (iExecTypeMap.hasNext()) {
+ iCapMap = iExecTypeMap.next().values().iterator();
+ } else {
+ iCapMap =
+ new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+ .iterator();
+ }
+ if (iCapMap.hasNext()) {
+ iResReqInfo = iCapMap.next().values().iterator();
+ } else {
+ iResReqInfo = new LinkedList<ResourceRequestInfo>().iterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iLocMap.hasNext()
+ || iExecTypeMap.hasNext()
+ || iCapMap.hasNext()
+ || iResReqInfo.hasNext();
+ }
+
+ @Override
+ public ResourceRequestInfo next() {
+ if (!iResReqInfo.hasNext()) {
+ if (!iCapMap.hasNext()) {
+ if (!iExecTypeMap.hasNext()) {
+ iExecTypeMap = iLocMap.next().values().iterator();
+ }
+ iCapMap = iExecTypeMap.next().values().iterator();
+ }
+ iResReqInfo = iCapMap.next().values().iterator();
+ }
+ return iResReqInfo.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported" +
+ "for this iterator !!");
+ }
+ }
+
+ // Nest map with Primary key :
+ // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
+ // and value : ResourceRequestInfo
+ private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
+
+ @Override
+ public Iterator<ResourceRequestInfo> iterator() {
+ return new RequestInfoIterator(remoteRequestsTable.values().iterator());
+ }
+
+ ResourceRequestInfo get(Priority priority, String location,
+ ExecutionType execType, Resource capability) {
+ TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ getCapabilityMap(priority, location, execType);
+ if (capabilityMap == null) {
+ return null;
+ }
+ return capabilityMap.get(capability);
+ }
+
+ void put(Priority priority, String resourceName, ExecutionType execType,
+ Resource capability, ResourceRequestInfo resReqInfo) {
+ Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>> locationMap =
+ remoteRequestsTable.get(priority);
+ if (locationMap == null) {
+ locationMap = new HashMap<>();
+ this.remoteRequestsTable.put(priority, locationMap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
+ locationMap.get(resourceName);
+ if (execTypeMap == null) {
+ execTypeMap = new HashMap<>();
+ locationMap.put(resourceName, execTypeMap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added resourceName=" + resourceName);
+ }
+ }
+ TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ execTypeMap.get(execType);
+ if (capabilityMap == null) {
+ capabilityMap = new TreeMap<>(resourceComparator);
+ execTypeMap.put(execType, capabilityMap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added Execution Type=" + execType);
+ }
+ }
+ capabilityMap.put(capability, resReqInfo);
+ }
+
+ ResourceRequestInfo remove(Priority priority, String resourceName,
+ ExecutionType execType, Resource capability) {
+ ResourceRequestInfo retVal = null;
+ Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
+ if (locationMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such priority=" + priority);
+ }
+ return null;
+ }
+ Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ execTypeMap = locationMap.get(resourceName);
+ if (execTypeMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such resourceName=" + resourceName);
+ }
+ return null;
+ }
+ TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ execTypeMap.get(execType);
+ if (capabilityMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such Execution Type=" + execType);
+ }
+ return null;
+ }
+ retVal = capabilityMap.remove(capability);
+ if (capabilityMap.size() == 0) {
+ execTypeMap.remove(execType);
+ if (execTypeMap.size() == 0) {
+ locationMap.remove(resourceName);
+ if (locationMap.size() == 0) {
+ this.remoteRequestsTable.remove(priority);
+ }
+ }
+ }
+ return retVal;
+ }
+
+ Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>> getLocationMap(Priority priority) {
+ return remoteRequestsTable.get(priority);
+ }
+
+ Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ getExecutionTypeMap(Priority priority, String location) {
+ Map<String, Map<ExecutionType, TreeMap<Resource,
+ ResourceRequestInfo>>> locationMap = getLocationMap(priority);
+ if (locationMap == null) {
+ return null;
+ }
+ return locationMap.get(location);
+ }
+
+ TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+ priority, String location,
+ ExecutionType execType) {
+ Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ executionTypeMap = getExecutionTypeMap(priority, location);
+ if (executionTypeMap == null) {
+ return null;
+ }
+ return executionTypeMap.get(execType);
+ }
+
+ @SuppressWarnings("unchecked")
+ List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
+ Collection<String> locations) {
+ List retList = new LinkedList<>();
+ for (String location : locations) {
+ for (ExecutionType eType : ExecutionType.values()) {
+ TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ getCapabilityMap(priority, location, eType);
+ if (capabilityMap != null) {
+ retList.addAll(capabilityMap.values());
+ }
+ }
+ }
+ return retList;
+ }
+
+ List<ResourceRequestInfo> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability) {
+ List<ResourceRequestInfo> list = new LinkedList<>();
+ TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ getCapabilityMap(priority, resourceName, executionType);
+ if (capabilityMap != null) {
+ ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
+ if (resourceRequestInfo != null) {
+ list.add(resourceRequestInfo);
+ } else {
+ list.addAll(capabilityMap.tailMap(capability).values());
+ }
+ }
+ return list;
+ }
+
+ @SuppressWarnings("unchecked")
+ ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req,
+ boolean relaxLocality, String labelExpression) {
+ ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+ execTypeReq.getExecutionType(), capability);
+ if (resourceRequestInfo == null) {
+ resourceRequestInfo =
+ new ResourceRequestInfo(priority, resourceName, capability,
+ relaxLocality);
+ put(priority, resourceName, execTypeReq.getExecutionType(), capability,
+ resourceRequestInfo);
+ }
+ resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq);
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() + 1);
+
+ if (relaxLocality) {
+ resourceRequestInfo.containerRequests.add(req);
+ }
+
+ if (ResourceRequest.ANY.equals(resourceName)) {
+ resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
+ }
+ return resourceRequestInfo;
+ }
+
+ ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+ ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+ execTypeReq.getExecutionType(), capability);
+
+ if (resourceRequestInfo == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as ResourceRequestInfo with" +
+ "priority=" + priority + ", " +
+ "resourceName=" + resourceName + ", " +
+ "executionType=" + execTypeReq + ", " +
+ "capability=" + capability + " is not present in request table");
+ }
+ return null;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + resourceRequestInfo.remoteRequest.getNumContainers());
+ }
+
+ resourceRequestInfo.remoteRequest.setNumContainers(
+ resourceRequestInfo.remoteRequest.getNumContainers() - 1);
+
+ resourceRequestInfo.containerRequests.remove(req);
+
+ if (resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+ // guard against spurious removals
+ resourceRequestInfo.remoteRequest.setNumContainers(0);
+ }
+ return resourceRequestInfo;
+ }
+
+ boolean isEmpty() {
+ return remoteRequestsTable.isEmpty();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
new file mode 100644
index 0000000..0b62054
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base test case to be used for Testing frameworks that use AMRMProxy.
+ */
+public abstract class BaseAMRMProxyE2ETest {
+
+ protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+ ApplicationId appId, MiniYARNCluster cluster,
+ final Configuration yarnConf)
+ throws IOException, InterruptedException, YarnException {
+
+ UserGroupInformation user = null;
+
+ // Get the AMRMToken from AMRMProxy
+
+ ApplicationReport report = rmClient.getApplicationReport(appId);
+
+ user = UserGroupInformation.createProxyUser(
+ report.getCurrentApplicationAttemptId().toString(),
+ UserGroupInformation.getCurrentUser());
+
+ ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
+ .getNodeManager(0).getNMContext().getContainerManager();
+
+ AMRMProxyTokenSecretManager amrmTokenSecretManager =
+ containerManager.getAMRMProxyService().getSecretManager();
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+ amrmTokenSecretManager
+ .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
+
+ SecurityUtil.setTokenService(token,
+ containerManager.getAMRMProxyService().getBindAddress());
+ user.addToken(token);
+
+ // Start Application Master
+
+ return user
+ .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+ @Override
+ public ApplicationMasterProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(yarnConf,
+ ApplicationMasterProtocol.class);
+ }
+ });
+ }
+
+ protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
+ // The test needs AMRMClient to create a real allocate request
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+ new AMRMClientImpl<>();
+
+ Resource capability = Resource.newInstance(1024, 2);
+ Priority priority = Priority.newInstance(1);
+ List<NodeReport> nodeReports = listNode;
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String[] nodes = new String[] {node};
+
+ AMRMClient.ContainerRequest storedContainer1 =
+ new AMRMClient.ContainerRequest(capability, nodes, null, priority);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer1);
+
+ List<ResourceRequest> resourceAsk = new ArrayList<>();
+ for (ResourceRequest rr : amClient.ask) {
+ resourceAsk.add(rr);
+ }
+
+ ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
+ .newInstance(new ArrayList<>(), new ArrayList<>());
+
+ int responseId = 1;
+
+ return AllocateRequest.newInstance(responseId, 0, resourceAsk,
+ new ArrayList<>(), resourceBlacklistRequest);
+ }
+
+ protected ApplicationAttemptId createApp(YarnClient yarnClient,
+ MiniYARNCluster yarnCluster, Configuration conf) throws Exception {
+
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+
+ appContext.setApplicationName("Test");
+
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+
+ appContext.setQueue("default");
+
+ ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+ Collections.<String, LocalResource> emptyMap(),
+ new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
+ new HashMap<String, ByteBuffer>(), null,
+ new HashMap<ApplicationAccessType, String>());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+
+ SubmitApplicationRequest appRequest =
+ Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+
+ yarnClient.submitApplication(appContext);
+
+ RMAppAttempt appAttempt = null;
+ ApplicationAttemptId attemptId = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport
+ .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ attemptId =
+ appReport.getCurrentApplicationAttemptId();
+ appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken().setService(
+ ClientRMProxy.getAMRMTokenService(conf));
+ return attemptId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 75b49d0..99bfca5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -413,11 +414,13 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer3);
// test addition and storage
- int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
- containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability);
@@ -919,12 +922,15 @@ public class TestAMRMClient {
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
- int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
- .get(node).get(capability).remoteRequest.getNumContainers();
- int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
- .get(rack).get(capability).remoteRequest.getNumContainers();
- int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index cb8c86a..2db33c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -26,6 +26,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -35,6 +37,46 @@ import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.junit.Test;
public class TestAMRMClientContainerRequest {
+
+ @Test
+ public void testOpportunisticAndGuaranteedRequests() {
+ AMRMClientImpl<ContainerRequest> client =
+ new AMRMClientImpl<ContainerRequest>();
+
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1));
+ client.addContainerRequest(request);
+ verifyResourceRequest(client, request, "host1", true);
+ verifyResourceRequest(client, request, "host2", true);
+ verifyResourceRequest(client, request, "/rack1", true);
+ verifyResourceRequest(client, request, "/rack2", true);
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+ ContainerRequest request2 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ client.addContainerRequest(request2);
+ verifyResourceRequest(client, request, "host1", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "host2", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "/rack1", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "/rack2", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true,
+ ExecutionType.OPPORTUNISTIC);
+ }
+
@Test
public void testFillInRacks() {
AMRMClientImpl<ContainerRequest> client =
@@ -224,8 +266,16 @@ public class TestAMRMClientContainerRequest {
private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality) {
- ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
- .get(location).get(request.getCapability()).remoteRequest;
+ verifyResourceRequest(client, request, location, expectedRelaxLocality,
+ ExecutionType.GUARANTEED);
+ }
+
+ private void verifyResourceRequest(
+ AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+ String location, boolean expectedRelaxLocality,
+ ExecutionType executionType) {
+ ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
+ location, executionType, request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
index f1e3f03..33f7527 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -19,20 +19,12 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -40,43 +32,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
-public class TestAMRMProxy {
+/**
+ * End-to-End test cases for the AMRMProxy Service.
+ */
+public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
@@ -84,7 +58,7 @@ public class TestAMRMProxy {
* This test validates register, allocate and finish of an application through
* the AMRMPRoxy.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testAMRMProxyE2E() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
YarnClient rmClient = null;
@@ -107,7 +81,8 @@ public class TestAMRMProxy {
// Submit application
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -173,7 +148,7 @@ public class TestAMRMProxy {
* that the received token it is different from the previous one within 5
* requests.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
@@ -201,7 +176,8 @@ public class TestAMRMProxy {
// Submit
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -252,7 +228,7 @@ public class TestAMRMProxy {
* This test validates that an AM cannot register directly to the RM, with the
* token provided by the AMRMProxy.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testE2ETokenSwap() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
YarnClient rmClient = null;
@@ -270,7 +246,8 @@ public class TestAMRMProxy {
rmClient.init(yarnConf);
rmClient.start();
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -290,124 +267,4 @@ public class TestAMRMProxy {
cluster.stop();
}
}
-
- protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
- ApplicationId appId, MiniYARNCluster cluster,
- final Configuration yarnConf)
- throws IOException, InterruptedException, YarnException {
-
- UserGroupInformation user = null;
-
- // Get the AMRMToken from AMRMProxy
-
- ApplicationReport report = rmClient.getApplicationReport(appId);
-
- user = UserGroupInformation.createProxyUser(
- report.getCurrentApplicationAttemptId().toString(),
- UserGroupInformation.getCurrentUser());
-
- ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
- .getNodeManager(0).getNMContext().getContainerManager();
-
- AMRMProxyTokenSecretManager amrmTokenSecretManager =
- containerManager.getAMRMProxyService().getSecretManager();
- org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
- amrmTokenSecretManager
- .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
-
- SecurityUtil.setTokenService(token,
- containerManager.getAMRMProxyService().getBindAddress());
- user.addToken(token);
-
- // Start Application Master
-
- return user
- .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
- @Override
- public ApplicationMasterProtocol run() throws Exception {
- return ClientRMProxy.createRMProxy(yarnConf,
- ApplicationMasterProtocol.class);
- }
- });
- }
-
- protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
- // The test needs AMRMClient to create a real allocate request
- AMRMClientImpl<ContainerRequest> amClient =
- new AMRMClientImpl<ContainerRequest>();
-
- Resource capability = Resource.newInstance(1024, 2);
- Priority priority = Priority.newInstance(1);
- List<NodeReport> nodeReports = listNode;
- String node = nodeReports.get(0).getNodeId().getHost();
- String[] nodes = new String[] { node };
-
- ContainerRequest storedContainer1 =
- new ContainerRequest(capability, nodes, null, priority);
- amClient.addContainerRequest(storedContainer1);
- amClient.addContainerRequest(storedContainer1);
-
- List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>();
- for (ResourceRequest rr : amClient.ask) {
- resourceAsk.add(rr);
- }
-
- ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
- .newInstance(new ArrayList<String>(), new ArrayList<String>());
-
- int responseId = 1;
-
- return AllocateRequest.newInstance(responseId, 0, resourceAsk,
- new ArrayList<ContainerId>(), resourceBlacklistRequest);
- }
-
- protected ApplicationId createApp(YarnClient yarnClient,
- MiniYARNCluster yarnCluster) throws Exception {
-
- ApplicationSubmissionContext appContext =
- yarnClient.createApplication().getApplicationSubmissionContext();
- ApplicationId appId = appContext.getApplicationId();
-
- appContext.setApplicationName("Test");
-
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- appContext.setPriority(pri);
-
- appContext.setQueue("default");
-
- ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
- Collections.<String, LocalResource> emptyMap(),
- new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
- new HashMap<String, ByteBuffer>(), null,
- new HashMap<ApplicationAccessType, String>());
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(Resource.newInstance(1024, 1));
-
- SubmitApplicationRequest appRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
-
- yarnClient.submitApplication(appContext);
-
- RMAppAttempt appAttempt = null;
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport
- .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
- ApplicationAttemptId attemptId =
- appReport.getCurrentApplicationAttemptId();
- appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
- while (true) {
- if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- break;
- }
- }
- break;
- }
- }
- Thread.sleep(1000);
- return appId;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org