You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/17 00:18:31 UTC
svn commit: r1493601 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/
Author: tucu
Date: Sun Jun 16 22:18:30 2013
New Revision: 1493601
URL: http://svn.apache.org/r1493601
Log:
YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
- copied unchanged from r1493599, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Jun 16 22:18:30 2013
@@ -352,6 +352,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
+ YARN-752. In AMRMClient, automatically add corresponding rack requests for
+ requested nodes. (sandyr via tucu)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sun Jun 16 22:18:30 2013
@@ -42,23 +42,36 @@ import com.google.common.collect.Immutab
public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
/**
- * Object to represent container request for resources.
- * Resources may be localized to nodes and racks.
- * Resources may be assigned priorities.
- * All getters return unmodifiable collections.
- * Can ask for multiple containers of a given type.
+ * Object to represent container request for resources. Scheduler
+ * documentation should be consulted for the specifics of how the parameters
+ * are honored.
+ * All getters return immutable values.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The racks
+ * corresponding to any hosts requested will be automatically added to
+ * this list.
+ * @param priority
+ * The priority at which to request the containers. Higher priorities have
+ * lower numerical values.
+ * @param containerCount
+ * The number of containers to request.
*/
public static class ContainerRequest {
final Resource capability;
- final ImmutableList<String> hosts;
- final ImmutableList<String> racks;
+ final List<String> nodes;
+ final List<String> racks;
final Priority priority;
final int containerCount;
- public ContainerRequest(Resource capability, String[] hosts,
+ public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, int containerCount) {
this.capability = capability;
- this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+ this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority;
this.containerCount = containerCount;
@@ -68,8 +81,8 @@ public interface AMRMClient<T extends AM
return capability;
}
- public List<String> getHosts() {
- return hosts;
+ public List<String> getNodes() {
+ return nodes;
}
public List<String> getRacks() {
@@ -103,9 +116,9 @@ public interface AMRMClient<T extends AM
* AMRMClient can remove it from its internal store.
*/
public static class StoredContainerRequest extends ContainerRequest {
- public StoredContainerRequest(Resource capability, String[] hosts,
+ public StoredContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
- super(capability, hosts, racks, priority, 1);
+ super(capability, nodes, racks, priority, 1);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sun Jun 16 22:18:30 2013
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.base.Joiner;
import com.google.common.annotations.VisibleForTesting;
@@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends Co
//Key -> Priority
//Value -> Map
- //Key->ResourceName (e.g., hostname, rackname, *)
+ //Key->ResourceName (e.g., nodename, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
@@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends Co
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ RackResolver.init(conf);
super.serviceInit(conf);
}
@@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends Co
@Override
public synchronized void addContainerRequest(T req) {
- // Create resource requests
- // add check for dup locations
- if (req.hosts != null) {
- for (String host : req.hosts) {
- addResourceRequest(req.priority, host, req.capability,
- req.containerCount, req);
+ Set<String> allRacks = new HashSet<String>();
+ if (req.racks != null) {
+ allRacks.addAll(req.racks);
+ if(req.racks.size() != allRacks.size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate racks: "
+ + joiner.join(req.racks));
}
}
-
- if (req.racks != null) {
- for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability,
+ allRacks.addAll(resolveRacks(req.nodes));
+
+ if (req.nodes != null) {
+ HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
+ if(dedupedNodes.size() != req.nodes.size()) {
+ Joiner joiner = Joiner.on(',');
+ LOG.warn("ContainerRequest has duplicate nodes: "
+ + joiner.join(req.nodes));
+ }
+ for (String node : dedupedNodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ addResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
+ for (String rack : allRacks) {
+ addResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
+ }
+
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount, req);
@@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends Co
@Override
public synchronized void removeContainerRequest(T req) {
+ Set<String> allRacks = new HashSet<String>();
+ if (req.racks != null) {
+ allRacks.addAll(req.racks);
+ }
+ allRacks.addAll(resolveRacks(req.nodes));
+
// Update resource requests
- if (req.hosts != null) {
- for (String hostName : req.hosts) {
- decResourceRequest(req.priority, hostName, req.capability,
+ if (req.nodes != null) {
+ for (String node : new HashSet<String>(req.nodes)) {
+ decResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
}
}
- if (req.racks != null) {
- for (String rack : req.racks) {
- decResourceRequest(req.priority, rack, req.capability,
- req.containerCount, req);
- }
+ for (String rack : allRacks) {
+ decResourceRequest(req.priority, rack, req.capability,
+ req.containerCount, req);
}
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends Co
return list;
}
+ private Set<String> resolveRacks(List<String> nodes) {
+ Set<String> racks = new HashSet<String>();
+ if (nodes != null) {
+ for (String node : nodes) {
+ // Ensure node requests are accompanied by requests for
+ // corresponding rack
+ String rack = RackResolver.resolve(node).getNetworkLocation();
+ if (rack == null) {
+ LOG.warn("Failed to resolve rack for node " + node + ".");
+ } else {
+ racks.add(rack);
+ }
+ }
+ }
+
+ return racks;
+ }
+
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sun Jun 16 22:18:30 2013
@@ -267,6 +267,51 @@ public class TestAMRMClient {
assertTrue(matches.size() == 1);
assertTrue(matches.get(0).size() == matchSize);
}
+
+ @Test (timeout=60000)
+ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+ AMRMClientImpl<StoredContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ Resource capability = Resource.newInstance(1024, 2);
+
+ StoredContainerRequest storedContainer1 =
+ new StoredContainerRequest(capability, nodes, null, priority);
+ amClient.addContainerRequest(storedContainer1);
+
+ // verify matching with original node and inferred rack
+ List<? extends Collection<StoredContainerRequest>> matches;
+ StoredContainerRequest storedRequest;
+ // exact match node
+ matches = amClient.getMatchingRequests(priority, node, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+ // inferred match rack
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ verifyMatches(matches, 1);
+ storedRequest = matches.get(0).iterator().next();
+ assertTrue(storedContainer1 == storedRequest);
+
+ // inferred rack match no longer valid after request is removed
+ amClient.removeContainerRequest(storedContainer1);
+ matches = amClient.getMatchingRequests(priority, rack, capability);
+ assertTrue(matches.isEmpty());
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
@Test (timeout=60000)
public void testAMRMClientMatchStorage() throws YarnException, IOException {