You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/17 00:18:31 UTC

svn commit: r1493601 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/

Author: tucu
Date: Sun Jun 16 22:18:30 2013
New Revision: 1493601

URL: http://svn.apache.org/r1493601
Log:
YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu)

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
      - copied unchanged from r1493599, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Jun 16 22:18:30 2013
@@ -352,6 +352,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
     use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
 
+    YARN-752. In AMRMClient, automatically add corresponding rack requests for 
+    requested nodes. (sandyr via tucu)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sun Jun 16 22:18:30 2013
@@ -42,23 +42,36 @@ import com.google.common.collect.Immutab
 public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
 
   /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * All getters return unmodifiable collections.
-   * Can ask for multiple containers of a given type.
+   * Object to represent container request for resources. Scheduler
+   * documentation should be consulted for the specifics of how the parameters
+   * are honored.
+   * All getters return immutable values.
+   * 
+   * @param capability
+   *    The {@link Resource} to be requested for each container.
+   * @param nodes
+   *    Any hosts to request that the containers are placed on.
+   * @param racks
+   *    Any racks to request that the containers are placed on. The racks
+   *    corresponding to any hosts requested will be automatically added to
+   *    this list.
+   * @param priority
+   *    The priority at which to request the containers. Higher priorities have
+   *    lower numerical values.
+   * @param containerCount
+   *    The number of containers to request.
    */
   public static class ContainerRequest {
     final Resource capability;
-    final ImmutableList<String> hosts;
-    final ImmutableList<String> racks;
+    final List<String> nodes;
+    final List<String> racks;
     final Priority priority;
     final int containerCount;
         
-    public ContainerRequest(Resource capability, String[] hosts,
+    public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority, int containerCount) {
       this.capability = capability;
-      this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.containerCount = containerCount;
@@ -68,8 +81,8 @@ public interface AMRMClient<T extends AM
       return capability;
     }
     
-    public List<String> getHosts() {
-      return hosts;
+    public List<String> getNodes() {
+      return nodes;
     }
     
     public List<String> getRacks() {
@@ -103,9 +116,9 @@ public interface AMRMClient<T extends AM
    * AMRMClient can remove it from its internal store.
    */
   public static class StoredContainerRequest extends ContainerRequest {    
-    public StoredContainerRequest(Resource capability, String[] hosts,
+    public StoredContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority) {
-      super(capability, hosts, racks, priority, 1);
+      super(capability, nodes, racks, priority, 1);
     }
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sun Jun 16 22:18:30 2013
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.base.Joiner;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends Co
   
   //Key -> Priority
   //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Key->ResourceName (e.g., nodename, rackname, *)
   //Value->Map
   //Key->Resource Capability
   //Value->ResourceRequest
@@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends Co
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    RackResolver.init(conf);
     super.serviceInit(conf);
   }
 
@@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends Co
   
   @Override
   public synchronized void addContainerRequest(T req) {
-    // Create resource requests
-    // add check for dup locations
-    if (req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability,
-            req.containerCount, req);
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+      if(req.racks.size() != allRacks.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate racks: "
+            + joiner.join(req.racks));
       }
     }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability,
+    allRacks.addAll(resolveRacks(req.nodes));
+    
+    if (req.nodes != null) {
+      HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
+      if(dedupedNodes.size() != req.nodes.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate nodes: "
+            + joiner.join(req.nodes));        
+      }
+      for (String node : dedupedNodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        addResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
       }
     }
 
+    for (String rack : allRacks) {
+      addResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
+    }
+
     // Off-switch
     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
         req.containerCount, req);
@@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends Co
 
   @Override
   public synchronized void removeContainerRequest(T req) {
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+    }
+    allRacks.addAll(resolveRacks(req.nodes));
+
     // Update resource requests
-    if (req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability,
+    if (req.nodes != null) {
+      for (String node : new HashSet<String>(req.nodes)) {
+        decResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
       }
     }
 
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability,
-            req.containerCount, req);
-      }
+    for (String rack : allRacks) {
+      decResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
     }
 
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends Co
     return list;          
   }
   
+  private Set<String> resolveRacks(List<String> nodes) {
+    Set<String> racks = new HashSet<String>();    
+    if (nodes != null) {
+      for (String node : nodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        String rack = RackResolver.resolve(node).getNetworkLocation();
+        if (rack == null) {
+          LOG.warn("Failed to resolve rack for node " + node + ".");
+        } else {
+          racks.add(rack);
+        }
+      }
+    }
+    
+    return racks;
+  }
+  
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493601&r1=1493600&r2=1493601&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sun Jun 16 22:18:30 2013
@@ -267,6 +267,51 @@ public class TestAMRMClient {
     assertTrue(matches.size() == 1);
     assertTrue(matches.get(0).size() == matchSize);    
   }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability = Resource.newInstance(1024, 2);
+
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, null, priority);
+      amClient.addContainerRequest(storedContainer1);
+
+      // verify matching with original node and inferred rack
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match node
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      // inferred match rack
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      
+      // inferred rack match no longer valid after request is removed
+      amClient.removeContainerRequest(storedContainer1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      assertTrue(matches.isEmpty());
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 
   @Test (timeout=60000)
   public void testAMRMClientMatchStorage() throws YarnException, IOException {