You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/12 20:41:21 UTC
[06/50] [abbrv] hadoop git commit: YARN-5587. Add support for
resource profiles. (vvasudev via asuresh)
YARN-5587. Add support for resource profiles. (vvasudev via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6708ac33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6708ac33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6708ac33
Branch: refs/heads/YARN-5972
Commit: 6708ac330147b2d3816a31f2ee83e09c41fe0dd9
Parents: c2032e2
Author: Arun Suresh <as...@apache.org>
Authored: Tue Nov 15 01:01:07 2016 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Sep 12 09:19:10 2017 -0700
----------------------------------------------------------------------
.../dev-support/findbugs-exclude.xml | 4 +
.../RegisterApplicationMasterResponse.java | 8 +
.../yarn/api/records/ProfileCapability.java | 94 ++++++++++-
.../hadoop/yarn/api/records/Resource.java | 14 ++
.../yarn/api/records/ResourceInformation.java | 57 ++++++-
.../yarn/api/records/ResourceRequest.java | 43 ++++-
.../hadoop-yarn/hadoop-yarn-client/pom.xml | 1 +
.../hadoop/yarn/client/api/AMRMClient.java | 117 +++++++++++++-
.../yarn/client/api/impl/AMRMClientImpl.java | 152 ++++++++++-------
.../client/api/impl/RemoteRequestsTable.java | 109 +++++++++----
.../yarn/client/api/impl/TestAMRMClient.java | 141 ++++++++++++++--
.../impl/TestAMRMClientContainerRequest.java | 8 +-
.../api/impl/TestDistributedScheduling.java | 12 +-
.../yarn/client/api/impl/TestNMClient.java | 5 +-
.../TestOpportunisticContainerAllocation.java | 31 ++--
.../src/test/resources/resource-profiles.json | 18 +++
...RegisterApplicationMasterResponsePBImpl.java | 58 +++++++
.../api/records/impl/pb/ResourcePBImpl.java | 4 +-
.../records/impl/pb/ResourceRequestPBImpl.java | 41 ++++-
.../yarn/util/resource/ResourceUtils.java | 161 ++++++++++++++++++-
.../hadoop/yarn/util/resource/Resources.java | 10 +-
.../ApplicationMasterService.java | 1 +
.../resourcemanager/DefaultAMSProcessor.java | 8 +
.../server/resourcemanager/RMServerUtils.java | 50 ++++++
.../resource/ResourceProfilesManagerImpl.java | 4 +
.../scheduler/AbstractYarnScheduler.java | 48 +++++-
.../scheduler/ClusterNodeTracker.java | 3 +-
.../scheduler/SchedulerUtils.java | 10 ++
.../scheduler/capacity/CapacityScheduler.java | 4 +-
.../scheduler/fair/FairScheduler.java | 4 +-
.../scheduler/fifo/FifoScheduler.java | 13 +-
.../yarn/server/resourcemanager/MockRM.java | 2 +
.../server/resourcemanager/TestAppManager.java | 1 +
.../TestApplicationMasterService.java | 35 ++++
.../scheduler/fair/TestFairScheduler.java | 4 +
.../hadoop/yarn/server/MiniYARNCluster.java | 2 +
36 files changed, 1102 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6825a36..ce7a9c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -154,6 +154,10 @@
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
+ <Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" />
+ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+ </Match>
+ <Match>
<Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
<Field name="builder" />
<Bug pattern="SE_BAD_FIELD" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 0b886dd..8fa8563 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -204,4 +204,12 @@ public abstract class RegisterApplicationMasterResponse {
@Unstable
public abstract void setSchedulerResourceTypes(
EnumSet<SchedulerResourceTypes> types);
+
+ @Public
+ @Unstable
+ public abstract Map<String, Resource> getResourceProfiles();
+
+ @Private
+ @Unstable
+ public abstract void setResourceProfiles(Map<String, Resource> profiles);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
index 0a93b89..faaddd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
@@ -18,41 +18,93 @@
package org.apache.hadoop.yarn.api.records;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.Records;
+import java.util.Map;
+
/**
* Class to capture capability requirements when using resource profiles. The
* ProfileCapability is meant to be used as part of the ResourceRequest. A
* profile capability has two pieces - the resource profile name and the
* overrides. The resource profile specifies the name of the resource profile
* to be used and the capability override is the overrides desired on specific
- * resource types. For example, you could use the "minimum" profile and set the
- * memory in the capability override to 4096M. This implies that you wish for
- * the resources specified in the "minimum" profile but with 4096M memory. The
- * conversion from the ProfileCapability to the Resource class with the actual
- * resource requirements will be done by the ResourceManager, which has the
- * actual profile to Resource mapping.
+ * resource types.
+ *
+ * For example, if you have a resource profile "small" that maps to
+ * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+ * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+ * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+ *
+ * Note that the conversion from the ProfileCapability to the Resource class
+ * with the actual resource requirements will be done by the ResourceManager,
+ * which has the actual profile to Resource mapping.
+ *
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class ProfileCapability {
+ public static final String DEFAULT_PROFILE = "default";
+
+ public static ProfileCapability newInstance(Resource override) {
+ return newInstance(DEFAULT_PROFILE, override);
+ }
+
+ public static ProfileCapability newInstance(String profile) {
+ Preconditions
+ .checkArgument(profile != null, "The profile name cannot be null");
+ ProfileCapability obj = Records.newRecord(ProfileCapability.class);
+ obj.setProfileName(profile);
+ obj.setProfileCapabilityOverride(Resource.newInstance(0, 0));
+ return obj;
+ }
+
public static ProfileCapability newInstance(String profile,
Resource override) {
+ Preconditions
+ .checkArgument(profile != null, "The profile name cannot be null");
ProfileCapability obj = Records.newRecord(ProfileCapability.class);
obj.setProfileName(profile);
obj.setProfileCapabilityOverride(override);
return obj;
}
+ /**
+ * Get the profile name.
+ * @return the profile name
+ */
public abstract String getProfileName();
+ /**
+ * Get the profile capability override.
+ * @return Resource object containing the override.
+ */
public abstract Resource getProfileCapabilityOverride();
+ /**
+ * Set the resource profile name.
+ * @param profileName the resource profile name
+ */
public abstract void setProfileName(String profileName);
+ /**
+ * Set the capability override to override specific resource types on the
+ * resource profile.
+ *
+ * For example, if you have a resource profile "small" that maps to
+ * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+ * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+ * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+ *
+ * Note that the conversion from the ProfileCapability to the Resource class
+ * with the actual resource requirements will be done by the ResourceManager,
+ * which has the actual profile to Resource mapping.
+ *
+ * @param r Resource object containing the capability override
+ */
public abstract void setProfileCapabilityOverride(Resource r);
@Override
@@ -85,4 +137,34 @@ public abstract class ProfileCapability {
return "{ profile: " + this.getProfileName() + ", capabilityOverride: "
+ this.getProfileCapabilityOverride() + " }";
}
+
+ /**
+ * Get a representation of the capability as a Resource object.
+ * @param capability the capability we wish to convert
+ * @param resourceProfilesMap map of profile name to Resource object
+ * @return Resource object representing the capability
+ */
+ public static Resource toResource(ProfileCapability capability,
+ Map<String, Resource> resourceProfilesMap) {
+ Preconditions
+ .checkArgument(capability != null, "Capability cannot be null");
+ Preconditions.checkArgument(resourceProfilesMap != null,
+ "Resource profiles map cannot be null");
+ Resource resource = Resource.newInstance(0, 0);
+
+ if (resourceProfilesMap.containsKey(capability.getProfileName())) {
+ resource = Resource
+ .newInstance(resourceProfilesMap.get(capability.getProfileName()));
+ }
+
+ if(capability.getProfileCapabilityOverride()!= null) {
+ for (Map.Entry<String, ResourceInformation> entry : capability
+ .getProfileCapabilityOverride().getResources().entrySet()) {
+ if (entry.getValue() != null && entry.getValue().getValue() != 0) {
+ resource.setResourceInformation(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return resource;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 507247e..c349a32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -101,6 +103,18 @@ public abstract class Resource implements Comparable<Resource> {
return new SimpleResource(memory, vCores);
}
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static Resource newInstance(Resource resource) {
+ Resource ret = Resource.newInstance(0, 0);
+ for (Map.Entry<String, ResourceInformation> entry : resource.getResources()
+ .entrySet()) {
+ ret.setResourceInformation(entry.getKey(),
+ ResourceInformation.newInstance(entry.getValue()));
+ }
+ return ret;
+ }
+
/**
* This method is DEPRECATED:
* Use {@link Resource#getMemorySize()} instead
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index a17e81b..7d74efc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
private String units;
private ResourceTypes resourceType;
private Long value;
+ private Long minimumAllocation;
+ private Long maximumAllocation;
private static final String MEMORY_URI = "memory-mb";
private static final String VCORES_URI = "vcores";
@@ -118,6 +120,42 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
}
/**
+ * Get the minimum allocation for the resource.
+ *
+ * @return the minimum allocation for the resource
+ */
+ public Long getMinimumAllocation() {
+ return minimumAllocation;
+ }
+
+ /**
+ * Set the minimum allocation for the resource.
+ *
+ * @param minimumAllocation the minimum allocation for the resource
+ */
+ public void setMinimumAllocation(Long minimumAllocation) {
+ this.minimumAllocation = minimumAllocation;
+ }
+
+ /**
+ * Get the maximum allocation for the resource.
+ *
+ * @return the maximum allocation for the resource
+ */
+ public Long getMaximumAllocation() {
+ return maximumAllocation;
+ }
+
+ /**
+ * Set the maximum allocation for the resource.
+ *
+ * @param maximumAllocation the maximum allocation for the resource
+ */
+ public void setMaximumAllocation(Long maximumAllocation) {
+ this.maximumAllocation = maximumAllocation;
+ }
+
+ /**
* Create a new instance of ResourceInformation from another object.
*
* @param other the object from which the new object should be created
@@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
ret.setResourceType(other.getResourceType());
ret.setUnits(other.getUnits());
ret.setValue(other.getValue());
+ ret.setMinimumAllocation(other.getMinimumAllocation());
+ ret.setMaximumAllocation(other.getMaximumAllocation());
return ret;
}
public static ResourceInformation newInstance(String name, String units,
- Long value, ResourceTypes type) {
+ Long value, ResourceTypes type, Long minimumAllocation,
+ Long maximumAllocation) {
ResourceInformation ret = new ResourceInformation();
ret.setName(name);
ret.setResourceType(type);
ret.setUnits(units);
ret.setValue(value);
+ ret.setMinimumAllocation(minimumAllocation);
+ ret.setMaximumAllocation(maximumAllocation);
return ret;
}
public static ResourceInformation newInstance(String name, String units,
Long value) {
return ResourceInformation
- .newInstance(name, units, value, ResourceTypes.COUNTABLE);
+ .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
+ Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name, String units) {
return ResourceInformation
- .newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
+ .newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L,
+ Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name, Long value) {
return ResourceInformation
- .newInstance(name, "", value, ResourceTypes.COUNTABLE);
+ .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
+ Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name) {
@@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
@Override
public String toString() {
return "name: " + this.name + ", units: " + this.units + ", type: "
- + resourceType + ", value: " + value;
+ + resourceType + ", value: " + value + ", minimum allocation: "
+ + minimumAllocation + ", maximum allocation: " + maximumAllocation;
}
public String getShorthandRepresentation() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 5bedc87..c1339b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -98,7 +98,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality)
.nodeLabelExpression(labelExpression)
- .executionTypeRequest(executionTypeRequest).build();
+ .executionTypeRequest(executionTypeRequest).profileCapability(null)
+ .build();
+ }
+
+ @Public
+ @Evolving
+ public static ResourceRequest newInstance(Priority priority, String hostName,
+ Resource capability, int numContainers, boolean relaxLocality,
+ String labelExpression, ExecutionTypeRequest executionTypeRequest,
+ ProfileCapability profile) {
+ return ResourceRequest.newBuilder().priority(priority)
+ .resourceName(hostName).capability(capability)
+ .numContainers(numContainers).relaxLocality(relaxLocality)
+ .nodeLabelExpression(labelExpression)
+ .executionTypeRequest(executionTypeRequest).profileCapability(profile)
+ .build();
}
@Public
@@ -124,6 +139,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
resourceRequest.setRelaxLocality(true);
resourceRequest.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance());
+ resourceRequest.setProfileCapability(null);
}
/**
@@ -238,6 +254,21 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
}
/**
+ * Set the <code>resourceProfile</code> of the request.
+ * @see ResourceRequest#setProfileCapability(ProfileCapability)
+ * @param profileCapability
+ * <code>profileCapability</code> of the request
+ * @return {@link ResourceRequestBuilder}
+ */
+ @Public
+ @Evolving
+ public ResourceRequestBuilder profileCapability(
+ ProfileCapability profileCapability) {
+ resourceRequest.setProfileCapability(profileCapability);
+ return this;
+ }
+
+ /**
* Return generated {@link ResourceRequest} object.
* @return {@link ResourceRequest}
*/
@@ -454,6 +485,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Evolving
public abstract void setNodeLabelExpression(String nodelabelExpression);
+ @Public
+ @Evolving
+ public abstract ProfileCapability getProfileCapability();
+
+ @Public
+ @Evolving
+ public abstract void setProfileCapability(ProfileCapability p);
+
/**
* Get the optional <em>ID</em> corresponding to this allocation request. This
* ID is an identifier for different {@code ResourceRequest}s from the <b>same
@@ -529,12 +568,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
Resource capability = getCapability();
String hostName = getResourceName();
Priority priority = getPriority();
+ ProfileCapability profile = getProfileCapability();
result =
prime * result + ((capability == null) ? 0 : capability.hashCode());
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + getNumContainers();
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
+ result = prime * result + ((profile == null) ? 0 : profile.hashCode());
return result;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 8413f15..4654000 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -144,6 +144,7 @@
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
+ <exclude>src/test/resources/resource-profiles.json</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index fc64093..815915e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
@@ -117,6 +118,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
private String nodeLabelsExpression;
private ExecutionTypeRequest executionTypeRequest =
ExecutionTypeRequest.newInstance();
+ private String resourceProfile;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@@ -163,6 +165,26 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this(capability, nodes, racks, priority, allocationRequestId, true, null,
ExecutionTypeRequest.newInstance());
}
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints and
+ * locality relaxation enabled.
+ *
+ * @param capability
+ * The {@link ProfileCapability} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ */
+ public ContainerRequest(ProfileCapability capability, String[] nodes,
+ String[] racks, Priority priority) {
+ this(capability, nodes, racks, priority, 0, true, null);
+ }
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@@ -191,6 +213,29 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
+ * The {@link ProfileCapability} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param relaxLocality
+ * If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
+ */
+ public ContainerRequest(ProfileCapability capability, String[] nodes,
+ String[] racks, Priority priority, boolean relaxLocality) {
+ this(capability, nodes, racks, priority, 0, relaxLocality, null);
+ }
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
@@ -277,6 +322,59 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
relaxLocality, nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
}
+
+ public ContainerRequest(ProfileCapability capability, String[] nodes,
+ String[] racks, Priority priority, long allocationRequestId,
+ boolean relaxLocality, String nodeLabelsExpression) {
+ this(capability, nodes, racks, priority, allocationRequestId,
+ relaxLocality, nodeLabelsExpression,
+ ExecutionTypeRequest.newInstance());
+ }
+
+ /**
+ * Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The
+ * racks corresponding to any hosts requested will be automatically
+ * added to this list.
+ * @param priority
+ * The priority at which to request the containers. Higher
+ * priorities have lower numerical values.
+ * @param allocationRequestId
+ * The allocationRequestId of the request. To be used as a tracking
+ * id to match Containers allocated against this request. Will
+ * default to 0 if not specified.
+ * @param relaxLocality
+ * If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
+ * @param nodeLabelsExpression
+ * Set node labels to allocate resource, now we only support
+ * asking for only a single node label
+ * @param executionTypeRequest
+ * Set the execution type of the container request.
+ */
+ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
+ Priority priority, long allocationRequestId, boolean relaxLocality,
+ String nodeLabelsExpression,
+ ExecutionTypeRequest executionTypeRequest) {
+ this(capability, nodes, racks, priority, allocationRequestId,
+ relaxLocality, nodeLabelsExpression, executionTypeRequest,
+ ProfileCapability.DEFAULT_PROFILE);
+ }
+
+ public ContainerRequest(ProfileCapability capability, String[] nodes,
+ String[] racks, Priority priority, long allocationRequestId,
+ boolean relaxLocality, String nodeLabelsExpression,
+ ExecutionTypeRequest executionTypeRequest) {
+ this(capability.getProfileCapabilityOverride(), nodes, racks, priority,
+ allocationRequestId, relaxLocality, nodeLabelsExpression,
+ executionTypeRequest, capability.getProfileName());
+ }
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@@ -304,11 +402,13 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* asking for only a single node label
* @param executionTypeRequest
* Set the execution type of the container request.
+ * @param profile
+ * Set the resource profile for the container request
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression,
- ExecutionTypeRequest executionTypeRequest) {
+ ExecutionTypeRequest executionTypeRequest, String profile) {
this.allocationRequestId = allocationRequestId;
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
@@ -317,6 +417,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
this.executionTypeRequest = executionTypeRequest;
+ this.resourceProfile = profile;
sanityCheck();
}
@@ -368,6 +469,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return executionTypeRequest;
}
+ public String getResourceProfile() {
+ return resourceProfile;
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
@@ -375,6 +480,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
.append("]");
+ sb.append("Resource Profile[").append(resourceProfile).append("]");
return sb.toString();
}
@@ -627,6 +733,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
" AMRMClient is expected to implement this !!");
}
+
+ @InterfaceStability.Evolving
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ ProfileCapability capability) {
+ throw new UnsupportedOperationException("The sub-class extending" +
+ " AMRMClient is expected to implement this !!");
+ }
+
/**
* Get outstanding <code>ContainerRequest</code>s matching the given
* allocationRequestId. These ContainerRequests should have been added via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 7a21bc6..8e66c20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -105,56 +106,56 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
+
+ protected Map<String, Resource> resourceProfilesMap;
static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
-
+
ResourceRequestInfo(Long allocationRequestId, Priority priority,
- String resourceName, Resource capability, boolean relaxLocality) {
+ String resourceName, Resource capability, boolean relaxLocality,
+ String resourceProfile) {
+ ProfileCapability profileCapability = ProfileCapability
+ .newInstance(resourceProfile, capability);
remoteRequest = ResourceRequest.newBuilder().priority(priority)
.resourceName(resourceName).capability(capability).numContainers(0)
- .allocationRequestId(allocationRequestId)
- .relaxLocality(relaxLocality).build();
+ .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality)
+ .profileCapability(profileCapability).build();
containerRequests = new LinkedHashSet<T>();
}
}
/**
- * Class compares Resource by memory then cpu in reverse order
+ * Class compares Resource by memory, then cpu and then the remaining resource
+ * types in reverse order.
*/
- static class ResourceReverseMemoryThenCpuComparator implements
- Comparator<Resource>, Serializable {
- static final long serialVersionUID = 12345L;
- @Override
- public int compare(Resource arg0, Resource arg1) {
- long mem0 = arg0.getMemorySize();
- long mem1 = arg1.getMemorySize();
- long cpu0 = arg0.getVirtualCores();
- long cpu1 = arg1.getVirtualCores();
- if(mem0 == mem1) {
- if(cpu0 == cpu1) {
- return 0;
- }
- if(cpu0 < cpu1) {
- return 1;
- }
- return -1;
- }
- if(mem0 < mem1) {
- return 1;
- }
- return -1;
- }
+ static class ProfileCapabilityComparator<T extends ProfileCapability>
+ implements Comparator<T> {
+
+ HashMap<String, Resource> resourceProfilesMap;
+
+ public ProfileCapabilityComparator(
+ HashMap<String, Resource> resourceProfileMap) {
+ this.resourceProfilesMap = resourceProfileMap;
+ }
+
+ public int compare(T arg0, T arg1) {
+ Resource resource0 =
+ ProfileCapability.toResource(arg0, resourceProfilesMap);
+ Resource resource1 =
+ ProfileCapability.toResource(arg1, resourceProfilesMap);
+ return resource1.compareTo(resource0);
+ }
}
- static boolean canFit(Resource arg0, Resource arg1) {
- long mem0 = arg0.getMemorySize();
- long mem1 = arg1.getMemorySize();
- long cpu0 = arg0.getVirtualCores();
- long cpu1 = arg1.getVirtualCores();
-
- return (mem0 <= mem1 && cpu0 <= cpu1);
+ boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
+ Resource resource0 =
+ ProfileCapability.toResource(arg0, resourceProfilesMap);
+ Resource resource1 =
+ ProfileCapability.toResource(arg1, resourceProfilesMap);
+ return Resources.fitsIn(resource0, resource1);
+
}
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
@@ -233,6 +234,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return registerApplicationMaster();
}
+ @SuppressWarnings("unchecked")
private RegisterApplicationMasterResponse registerApplicationMaster()
throws YarnException, IOException {
RegisterApplicationMasterRequest request =
@@ -245,6 +247,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
+ this.resourceProfilesMap = response.getResourceProfiles();
}
return response;
}
@@ -416,13 +419,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
- ResourceRequest rr = ResourceRequest.newBuilder()
- .priority(r.getPriority()).resourceName(r.getResourceName())
- .capability(r.getCapability()).numContainers(r.getNumContainers())
- .relaxLocality(r.getRelaxLocality())
- .nodeLabelExpression(r.getNodeLabelExpression())
- .executionTypeRequest(r.getExecutionTypeRequest())
- .allocationRequestId(r.getAllocationRequestId()).build();
+ ResourceRequest rr =
+ ResourceRequest.newBuilder().priority(r.getPriority())
+ .resourceName(r.getResourceName()).capability(r.getCapability())
+ .numContainers(r.getNumContainers())
+ .relaxLocality(r.getRelaxLocality())
+ .nodeLabelExpression(r.getNodeLabelExpression())
+ .executionTypeRequest(r.getExecutionTypeRequest())
+ .allocationRequestId(r.getAllocationRequestId())
+ .profileCapability(r.getProfileCapability()).build();
askList.add(rr);
}
return askList;
@@ -504,6 +509,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
public synchronized void addContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
+ ProfileCapability profileCapability = ProfileCapability
+ .newInstance(req.getResourceProfile(), req.getCapability());
Set<String> dedupedRacks = new HashSet<String>();
if (req.getRacks() != null) {
dedupedRacks.addAll(req.getRacks());
@@ -516,6 +523,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
Set<String> inferredRacks = resolveRacks(req.getNodes());
inferredRacks.removeAll(dedupedRacks);
+ checkResourceProfile(req.getResourceProfile());
+
// check that specific and non-specific requests cannot be mixed within a
// priority
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
@@ -540,26 +549,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node,
- req.getExecutionTypeRequest(), req.getCapability(), req, true,
+ req.getExecutionTypeRequest(), profileCapability, req, true,
req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
- req.getCapability(), req, true, req.getNodeLabelExpression());
+ profileCapability, req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
- req.getCapability(), req, req.getRelaxLocality(),
+ profileCapability, req, req.getRelaxLocality(),
req.getNodeLabelExpression());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getExecutionTypeRequest(), req.getCapability(), req,
+ req.getExecutionTypeRequest(), profileCapability, req,
req.getRelaxLocality(), req.getNodeLabelExpression());
}
@@ -567,6 +576,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
public synchronized void removeContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
+ ProfileCapability profileCapability = ProfileCapability
+ .newInstance(req.getResourceProfile(), req.getCapability());
Set<String> allRacks = new HashSet<String>();
if (req.getRacks() != null) {
allRacks.addAll(req.getRacks());
@@ -577,17 +588,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node,
- req.getExecutionTypeRequest(), req.getCapability(), req);
+ req.getExecutionTypeRequest(), profileCapability, req);
}
}
for (String rack : allRacks) {
decResourceRequest(req.getPriority(), rack,
- req.getExecutionTypeRequest(), req.getCapability(), req);
+ req.getExecutionTypeRequest(), profileCapability, req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getExecutionTypeRequest(), req.getCapability(), req);
+ req.getExecutionTypeRequest(), profileCapability, req);
}
@Override
@@ -686,6 +697,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
+ ProfileCapability profileCapability =
+ ProfileCapability.newInstance(capability);
+ return getMatchingRequests(priority, resourceName, executionType,
+ profileCapability);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized List<? extends Collection<T>> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ ProfileCapability capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
@@ -695,22 +717,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
RemoteRequestsTable remoteRequestsTable = getTable(0);
if (null != remoteRequestsTable) {
- List<ResourceRequestInfo<T>> matchingRequests =
- remoteRequestsTable.getMatchingRequests(priority, resourceName,
- executionType, capability);
+ List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable
+ .getMatchingRequests(priority, resourceName, executionType,
+ capability);
if (null != matchingRequests) {
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
- if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
- !resReqInfo.containerRequests.isEmpty()) {
+ if (canFit(resReqInfo.remoteRequest.getProfileCapability(),
+ capability) && !resReqInfo.containerRequests.isEmpty()) {
list.add(resReqInfo.containerRequests);
}
}
}
}
// no match found
- return list;
+ return list;
}
private Set<String> resolveRacks(List<String> nodes) {
@@ -758,6 +780,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
}
+
+ private void checkResourceProfile(String profile) {
+ if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
+ && !resourceProfilesMap.containsKey(profile)) {
+ throw new InvalidContainerRequestException(
+ "Invalid profile name, valid profile names are " + resourceProfilesMap
+ .keySet());
+ }
+ }
/**
* Valid if a node label expression specified on container request is valid or
@@ -845,12 +876,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
private void addResourceRequest(Priority priority, String resourceName,
- ExecutionTypeRequest execTypeReq, Resource capability, T req,
+ ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
boolean relaxLocality, String labelExpression) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable == null) {
remoteRequestsTable = new RemoteRequestsTable<T>();
+ if (this.resourceProfilesMap instanceof HashMap) {
+ remoteRequestsTable.setResourceComparator(
+ new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
+ }
putTable(req.getAllocationRequestId(), remoteRequestsTable);
}
@SuppressWarnings("unchecked")
@@ -863,6 +898,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
LOG.debug("addResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
@@ -872,7 +908,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
private void decResourceRequest(Priority priority, String resourceName,
- ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+ ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable != null) {
@@ -882,7 +918,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to
// override a previously sent value. If ResourceRequest was not sent
- // previously then sending 0 ought to be a no-op on RM
+ // previously then sending 0 aught to be a no-op on RM
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
index 110ca79..135e1db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import java.util.Collection;
import java.util.HashMap;
@@ -35,43 +35,42 @@ import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator;
class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
- static ResourceReverseMemoryThenCpuComparator resourceComparator =
- new ResourceReverseMemoryThenCpuComparator();
+ private ProfileCapabilityComparator resourceComparator;
/**
* Nested Iterator that iterates over just the ResourceRequestInfo
* object.
*/
class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
- private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+ private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>> iLocMap;
- private Iterator<Map<ExecutionType, TreeMap<Resource,
+ private Iterator<Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> iExecTypeMap;
- private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+ private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap;
private Iterator<ResourceRequestInfo> iResReqInfo;
public RequestInfoIterator(Iterator<Map<String,
- Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+ Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>>
iLocationMap) {
this.iLocMap = iLocationMap;
if (iLocMap.hasNext()) {
iExecTypeMap = iLocMap.next().values().iterator();
} else {
iExecTypeMap =
- new LinkedList<Map<ExecutionType, TreeMap<Resource,
+ new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>().iterator();
}
if (iExecTypeMap.hasNext()) {
iCapMap = iExecTypeMap.next().values().iterator();
} else {
iCapMap =
- new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+ new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>()
.iterator();
}
if (iCapMap.hasNext()) {
@@ -113,7 +112,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
// Nest map with Primary key :
// Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
// and value : ResourceRequestInfo
- private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+ private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
@Override
@@ -122,8 +121,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
}
ResourceRequestInfo get(Priority priority, String location,
- ExecutionType execType, Resource capability) {
- TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ ExecutionType execType, ProfileCapability capability) {
+ TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, execType);
if (capabilityMap == null) {
return null;
@@ -131,9 +130,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
return capabilityMap.get(capability);
}
+ @SuppressWarnings("unchecked")
void put(Priority priority, String resourceName, ExecutionType execType,
- Resource capability, ResourceRequestInfo resReqInfo) {
- Map<String, Map<ExecutionType, TreeMap<Resource,
+ ProfileCapability capability, ResourceRequestInfo resReqInfo) {
+ Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap =
remoteRequestsTable.get(priority);
if (locationMap == null) {
@@ -143,8 +143,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
LOG.debug("Added priority=" + priority);
}
}
- Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
- locationMap.get(resourceName);
+ Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
+ execTypeMap = locationMap.get(resourceName);
if (execTypeMap == null) {
execTypeMap = new HashMap<>();
locationMap.put(resourceName, execTypeMap);
@@ -152,9 +152,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
LOG.debug("Added resourceName=" + resourceName);
}
}
- TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
+ // this can happen if the user doesn't register with the RM before
+ // calling addResourceRequest
+ if (resourceComparator == null) {
+ resourceComparator = new ProfileCapabilityComparator(new HashMap<>());
+ }
capabilityMap = new TreeMap<>(resourceComparator);
execTypeMap.put(execType, capabilityMap);
if (LOG.isDebugEnabled()) {
@@ -165,9 +170,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
}
ResourceRequestInfo remove(Priority priority, String resourceName,
- ExecutionType execType, Resource capability) {
+ ExecutionType execType, ProfileCapability capability) {
ResourceRequestInfo retVal = null;
- Map<String, Map<ExecutionType, TreeMap<Resource,
+ Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
if (locationMap == null) {
if (LOG.isDebugEnabled()) {
@@ -175,7 +180,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
}
return null;
}
- Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
execTypeMap = locationMap.get(resourceName);
if (execTypeMap == null) {
if (LOG.isDebugEnabled()) {
@@ -183,7 +188,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
}
return null;
}
- TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
if (LOG.isDebugEnabled()) {
@@ -204,14 +209,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
return retVal;
}
- Map<String, Map<ExecutionType, TreeMap<Resource,
+ Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> getLocationMap(Priority priority) {
return remoteRequestsTable.get(priority);
}
- Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
getExecutionTypeMap(Priority priority, String location) {
- Map<String, Map<ExecutionType, TreeMap<Resource,
+ Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap = getLocationMap(priority);
if (locationMap == null) {
return null;
@@ -219,10 +224,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
return locationMap.get(location);
}
- TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+ TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority
priority, String location,
ExecutionType execType) {
- Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+ Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
executionTypeMap = getExecutionTypeMap(priority, location);
if (executionTypeMap == null) {
return null;
@@ -236,7 +241,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
List retList = new LinkedList<>();
for (String location : locations) {
for (ExecutionType eType : ExecutionType.values()) {
- TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, eType);
if (capabilityMap != null) {
retList.addAll(capabilityMap.values());
@@ -248,9 +253,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
List<ResourceRequestInfo> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
- Resource capability) {
+ ProfileCapability capability) {
List<ResourceRequestInfo> list = new LinkedList<>();
- TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+ TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, resourceName, executionType);
if (capabilityMap != null) {
ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
@@ -266,14 +271,15 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
@SuppressWarnings("unchecked")
ResourceRequestInfo addResourceRequest(Long allocationRequestId,
Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
- Resource capability, T req, boolean relaxLocality,
+ ProfileCapability capability, T req, boolean relaxLocality,
String labelExpression) {
- ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
- execTypeReq.getExecutionType(), capability);
+ ResourceRequestInfo resourceRequestInfo =
+ get(priority, resourceName, execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
new ResourceRequestInfo(allocationRequestId, priority, resourceName,
- capability, relaxLocality);
+ capability.getProfileCapabilityOverride(), relaxLocality,
+ capability.getProfileName());
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
resourceRequestInfo);
}
@@ -288,11 +294,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
if (ResourceRequest.ANY.equals(resourceName)) {
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
+ }
return resourceRequestInfo;
}
ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
- ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+ ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
@@ -330,4 +339,34 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
return remoteRequestsTable.isEmpty();
}
+ @SuppressWarnings("unchecked")
+ public void setResourceComparator(ProfileCapabilityComparator comparator) {
+ ProfileCapabilityComparator old = this.resourceComparator;
+ this.resourceComparator = comparator;
+ if (old != null) {
+ // we've already set a resource comparator - re-create the maps with the
+ // new one. this is needed in case someone adds container requests before
+ // registering with the RM. In such a case, the comparator won't have
+ // the resource profiles map. After registration, the map is available
+ // so re-create the capabilities maps
+
+ for (Map.Entry<Priority, Map<String, Map<ExecutionType,
+ TreeMap<ProfileCapability, ResourceRequestInfo>>>>
+ priEntry : remoteRequestsTable.entrySet()) {
+ for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability,
+ ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) {
+ for (Map.Entry<ExecutionType, TreeMap<ProfileCapability,
+ ResourceRequestInfo>> execEntry : nameEntry
+ .getValue().entrySet()) {
+ Map<ProfileCapability, ResourceRequestInfo> capabilityMap =
+ execEntry.getValue();
+ TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap =
+ new TreeMap<>(resourceComparator);
+ newCapabiltyMap.putAll(capabilityMap);
+ execEntry.setValue(newCapabiltyMap);
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index b6beb38..1994b55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -130,11 +130,13 @@ public class TestAMRMClient {
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
- createClusterAndStartApplication();
+ createClusterAndStartApplication(conf);
}
- private void createClusterAndStartApplication() throws Exception {
+ private void createClusterAndStartApplication(Configuration conf)
+ throws Exception {
// start minicluster
+ this.conf = conf;
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@@ -536,7 +538,8 @@ public class TestAMRMClient {
}
@Test (timeout=60000)
- public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+ public void testAMRMClientMatchingFitInferredRack()
+ throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
@@ -544,10 +547,10 @@ public class TestAMRMClient {
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
-
+
Resource capability = Resource.newInstance(1024, 2);
- ContainerRequest storedContainer1 =
+ ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, null, priority);
amClient.addContainerRequest(storedContainer1);
@@ -564,14 +567,15 @@ public class TestAMRMClient {
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertEquals(storedContainer1, storedRequest);
-
+
// inferred rack match no longer valid after request is removed
amClient.removeContainerRequest(storedContainer1);
matches = amClient.getMatchingRequests(priority, rack, capability);
assertTrue(matches.isEmpty());
-
- amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- null, null);
+
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@@ -604,16 +608,19 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
+
+ ProfileCapability profileCapability =
+ ProfileCapability.newInstance(capability);
// test addition and storage
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
int containersRequestedAny = remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
containersRequestedAny = remoteRequestsTable.get(priority1,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
@@ -884,7 +891,7 @@ public class TestAMRMClient {
teardown();
conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, "privacy");
- createClusterAndStartApplication();
+ createClusterAndStartApplication(conf);
initAMRMClientAndTest(false);
}
@@ -1702,14 +1709,16 @@ public class TestAMRMClient {
int expAsks, int expRelease) {
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(allocationReqId);
+ ProfileCapability profileCapability =
+ ProfileCapability.newInstance(capability);
int containersRequestedNode = remoteRequestsTable.get(priority,
- node, ExecutionType.GUARANTEED, capability).remoteRequest
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
- rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(expNode, containersRequestedNode);
@@ -1907,4 +1916,106 @@ public class TestAMRMClient {
}
return result;
}
+
+ @Test(timeout = 60000)
+ public void testGetMatchingFitWithProfiles() throws Exception {
+ teardown();
+ conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
+ createClusterAndStartApplication(conf);
+ AMRMClient<ContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient.<ContainerRequest>createAMRMClient();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ ProfileCapability capability1 = ProfileCapability.newInstance("minimum");
+ ProfileCapability capability2 = ProfileCapability.newInstance("default");
+ ProfileCapability capability3 = ProfileCapability.newInstance("maximum");
+ ProfileCapability capability4 = ProfileCapability
+ .newInstance("minimum", Resource.newInstance(2048, 1));
+ ProfileCapability capability5 = ProfileCapability.newInstance("default");
+ ProfileCapability capability6 = ProfileCapability
+ .newInstance("default", Resource.newInstance(2048, 1));
+ // http has the same capabilities as default
+ ProfileCapability capability7 = ProfileCapability.newInstance("http");
+
+ ContainerRequest storedContainer1 =
+ new ContainerRequest(capability1, nodes, racks, priority);
+ ContainerRequest storedContainer2 =
+ new ContainerRequest(capability2, nodes, racks, priority);
+ ContainerRequest storedContainer3 =
+ new ContainerRequest(capability3, nodes, racks, priority);
+ ContainerRequest storedContainer4 =
+ new ContainerRequest(capability4, nodes, racks, priority);
+ ContainerRequest storedContainer5 =
+ new ContainerRequest(capability5, nodes, racks, priority2);
+ ContainerRequest storedContainer6 =
+ new ContainerRequest(capability6, nodes, racks, priority);
+ ContainerRequest storedContainer7 =
+ new ContainerRequest(capability7, nodes, racks, priority);
+
+
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer2);
+ amClient.addContainerRequest(storedContainer3);
+ amClient.addContainerRequest(storedContainer4);
+ amClient.addContainerRequest(storedContainer5);
+ amClient.addContainerRequest(storedContainer6);
+ amClient.addContainerRequest(storedContainer7);
+
+ // test matching of containers
+ List<? extends Collection<ContainerRequest>> matches;
+ ContainerRequest storedRequest;
+ // exact match
+ ProfileCapability testCapability1 =
+ ProfileCapability.newInstance("minimum");
+ matches = amClient
+ .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+ testCapability1);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertEquals(storedContainer1, storedRequest);
+ amClient.removeContainerRequest(storedContainer1);
+
+ // exact matching with order maintained
+ // we should get back 3 matches - default + http because they have the
+ // same capability
+ ProfileCapability testCapability2 =
+ ProfileCapability.newInstance("default");
+ matches = amClient
+ .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+ testCapability2);
+ verifyMatches(matches, 2);
+ // must be returned in the order they were made
+ int i = 0;
+ for (ContainerRequest storedRequest1 : matches.get(0)) {
+ switch(i) {
+ case 0:
+ assertEquals(storedContainer2, storedRequest1);
+ break;
+ case 1:
+ assertEquals(storedContainer7, storedRequest1);
+ break;
+ }
+ i++;
+ }
+ amClient.removeContainerRequest(storedContainer5);
+
+ // matching with larger container. all requests returned
+ Resource testCapability3 = Resource.newInstance(8192, 8);
+ matches = amClient
+ .getMatchingRequests(priority, node, testCapability3);
+ assertEquals(3, matches.size());
+
+ Resource testCapability4 = Resource.newInstance(2048, 1);
+ matches = amClient.getMatchingRequests(priority, node, testCapability4);
+ assertEquals(1, matches.size());
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index 9603539..c87123a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -276,9 +277,10 @@ public class TestAMRMClientContainerRequest {
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality,
ExecutionType executionType) {
- ResourceRequest ask = client.getTable(0)
- .get(request.getPriority(), location, executionType,
- request.getCapability()).remoteRequest;
+ ProfileCapability profileCapability = ProfileCapability
+ .newInstance(request.getResourceProfile(), request.getCapability());
+ ResourceRequest ask = client.getTable(0).get(request.getPriority(),
+ location, executionType, profileCapability).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index e180f6d..00f5e03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@@ -387,18 +388,21 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
+ ProfileCapability profileCapability =
+ ProfileCapability.newInstance(capability);
+
int containersRequestedNode = remoteRequestsTable.get(priority,
- node, ExecutionType.GUARANTEED, capability).remoteRequest
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
- rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
remoteRequestsTable.get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index b23a923..016f1bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -255,9 +256,11 @@ public class TestNMClient {
racks, priority));
}
+ ProfileCapability profileCapability =
+ ProfileCapability.newInstance(capability);
int containersRequestedAny = rmClient.getTable(0)
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
- capability).remoteRequest.getNumContainers();
+ profileCapability).remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index 305d18b..12c32fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@@ -99,6 +100,7 @@ public class TestOpportunisticContainerAllocation {
private static final long AM_EXPIRE_MS = 4000;
private static Resource capability;
+ private static ProfileCapability profileCapability;
private static Priority priority;
private static Priority priority2;
private static Priority priority3;
@@ -151,6 +153,7 @@ public class TestOpportunisticContainerAllocation {
priority3 = Priority.newInstance(3);
priority4 = Priority.newInstance(4);
capability = Resource.newInstance(512, 1);
+ profileCapability = ProfileCapability.newInstance(capability);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
@@ -273,7 +276,7 @@ public class TestOpportunisticContainerAllocation {
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(1, oppContainersRequestedAny);
@@ -394,7 +397,7 @@ public class TestOpportunisticContainerAllocation {
new AMRMClient.ContainerRequest(capability, null, null, priority3));
int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(1, guarContainersRequestedAny);
@@ -512,6 +515,7 @@ public class TestOpportunisticContainerAllocation {
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
+
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
@@ -532,17 +536,17 @@ public class TestOpportunisticContainerAllocation {
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, capability).remoteRequest
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(4, containersRequestedNode);
@@ -564,17 +568,17 @@ public class TestOpportunisticContainerAllocation {
ExecutionType.OPPORTUNISTIC, true)));
containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, capability).remoteRequest
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
@@ -691,10 +695,9 @@ public class TestOpportunisticContainerAllocation {
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
- int oppContainersRequestedAny =
- amClient.getTable(0).get(priority3, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, capability).remoteRequest
- .getNumContainers();
+ int oppContainersRequestedAny = amClient.getTable(0)
+ .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ profileCapability).remoteRequest.getNumContainers();
assertEquals(2, oppContainersRequestedAny);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6708ac33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
new file mode 100644
index 0000000..d0f3f72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
@@ -0,0 +1,18 @@
+{
+ "minimum": {
+ "memory-mb" : 1024,
+ "vcores" : 1
+ },
+ "default" : {
+ "memory-mb" : 2048,
+ "vcores" : 2
+ },
+ "maximum" : {
+ "memory-mb": 4096,
+ "vcores" : 4
+ },
+ "http" : {
+ "memory-mb" : 2048,
+ "vcores" : 2
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org