You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/08/31 01:03:44 UTC

svn commit: r1519112 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ hado...

Author: bikas
Date: Fri Aug 30 23:03:44 2013
New Revision: 1519112

URL: http://svn.apache.org/r1519112
Log:
Merge 1519107. from trunk to branch-2 for YARN-771. AMRMClient support for resource blacklisting (Junping Du via bikas)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Aug 30 23:03:44 2013
@@ -54,6 +54,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
     vinodkv)
 
+    YARN-771. AMRMClient support for resource blacklisting (Junping Du via
+    bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Fri Aug 30 23:03:44 2013
@@ -286,4 +286,15 @@ public abstract class AMRMClient<T exten
                                            Priority priority, 
                                            String resourceName, 
                                            Resource capability);
+  
+  /**
+   * Update application's blacklist with addition or removal resources.
+   * 
+   * @param blacklistAdditions list of resources which should be added to the 
+   *        application blacklist
+   * @param blacklistRemovals list of resources which should be removed from the 
+   *        application blacklist
+   */
+  public abstract void updateBlacklist(List<String> blacklistAdditions,
+      List<String> blacklistRemovals);
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Fri Aug 30 23:03:44 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -80,6 +81,9 @@ public class AMRMClientImpl<T extends Co
   protected Resource clusterAvailableResources;
   protected int clusterNodeCount;
   
+  protected final Set<String> blacklistAdditions = new HashSet<String>();
+  protected final Set<String> blacklistRemovals = new HashSet<String>();
+  
   class ResourceRequestInfo {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
@@ -199,9 +203,11 @@ public class AMRMClientImpl<T extends Co
     Preconditions.checkArgument(progressIndicator >= 0,
         "Progress indicator should not be negative");
     AllocateResponse allocateResponse = null;
-    ArrayList<ResourceRequest> askList = null;
-    ArrayList<ContainerId> releaseList = null;
+    List<ResourceRequest> askList = null;
+    List<ContainerId> releaseList = null;
     AllocateRequest allocateRequest = null;
+    List<String> blacklistToAdd = new ArrayList<String>();
+    List<String> blacklistToRemove = new ArrayList<String>();
     
     try {
       synchronized (this) {
@@ -217,9 +223,22 @@ public class AMRMClientImpl<T extends Co
         // optimistically clear this collection assuming no RPC failure
         ask.clear();
         release.clear();
+
+        blacklistToAdd.addAll(blacklistAdditions);
+        blacklistToRemove.addAll(blacklistRemovals);
+        
+        ResourceBlacklistRequest blacklistRequest = 
+            (blacklistToAdd != null) || (blacklistToRemove != null) ? 
+            ResourceBlacklistRequest.newInstance(blacklistToAdd,
+                blacklistToRemove) : null;
+        
         allocateRequest =
             AllocateRequest.newInstance(lastResponseId, progressIndicator,
-              askList, releaseList, null);
+              askList, releaseList, blacklistRequest);
+        // clear blacklistAdditions and blacklistRemovals before 
+        // unsynchronized part
+        blacklistAdditions.clear();
+        blacklistRemovals.clear();
       }
 
       allocateResponse = rmClient.allocate(allocateRequest);
@@ -253,6 +272,9 @@ public class AMRMClientImpl<T extends Co
               ask.add(oldAsk);
             }
           }
+          
+          blacklistAdditions.addAll(blacklistToAdd);
+          blacklistRemovals.addAll(blacklistToRemove);
         }
       }
     }
@@ -604,4 +626,31 @@ public class AMRMClientImpl<T extends Co
           + " #asks=" + ask.size());
     }
   }
+
+  @Override
+  public synchronized void updateBlacklist(List<String> blacklistAdditions,
+      List<String> blacklistRemovals) {
+    
+    if (blacklistAdditions != null) {
+      this.blacklistAdditions.addAll(blacklistAdditions);
+      // if some resources are also in blacklistRemovals updated before, we 
+      // should remove them here.
+      this.blacklistRemovals.removeAll(blacklistAdditions);
+    }
+    
+    if (blacklistRemovals != null) {
+      this.blacklistRemovals.addAll(blacklistRemovals);
+      // if some resources are in blacklistAdditions before, we should remove
+      // them here.
+      this.blacklistAdditions.removeAll(blacklistRemovals);
+    }
+    
+    if (blacklistAdditions != null && blacklistRemovals != null
+        && blacklistAdditions.removeAll(blacklistRemovals)) {
+      // we allow resources to appear in addition list and removal list in the
+      // same invocation of updateBlacklist(), but should get a warn here.
+      LOG.warn("The same resources appear in both blacklistAdditions and " +
+          "blacklistRemovals in updateBlacklist.");
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Fri Aug 30 23:03:44 2013
@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.yarn.client.api.impl;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,6 +100,7 @@ public class TestAMRMClient {
   static String rack;
   static String[] nodes;
   static String[] racks;
+  private final static int DEFAULT_ITERATION = 3;
   
   @BeforeClass
   public static void setup() throws Exception {
@@ -476,6 +480,144 @@ public class TestAMRMClient {
       }
     }
   }
+  
+  @Test (timeout=60000)
+  public void testAllocationWithBlacklist() throws YarnException, IOException {
+    AMRMClientImpl<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient =
+          (AMRMClientImpl<ContainerRequest>) AMRMClient
+            .<ContainerRequest> createAMRMClient();
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(amClient.release.size() == 0);
+      
+      ContainerRequest storedContainer1 = 
+          new ContainerRequest(capability, nodes, racks, priority);
+      amClient.addContainerRequest(storedContainer1);
+      assertTrue(amClient.ask.size() == 3);
+      assertTrue(amClient.release.size() == 0);
+      
+      List<String> localNodeBlacklist = new ArrayList<String>();
+      localNodeBlacklist.add(node);
+      
+      // put node in black list, so no container assignment
+      amClient.updateBlacklist(localNodeBlacklist, null);
+
+      int allocatedContainerCount = getAllocatedContainersNumber(amClient,
+        DEFAULT_ITERATION);
+      // the only node is in blacklist, so no allocation
+      assertTrue(allocatedContainerCount == 0);
+
+      // Remove node from blacklist, so get assigned with 2
+      amClient.updateBlacklist(null, localNodeBlacklist);
+      ContainerRequest storedContainer2 = 
+              new ContainerRequest(capability, nodes, racks, priority);
+      amClient.addContainerRequest(storedContainer2);
+      allocatedContainerCount = getAllocatedContainersNumber(amClient,
+          DEFAULT_ITERATION);
+      assertEquals(allocatedContainerCount, 2);
+      
+      // Test in case exception in allocate(), blacklist is kept
+      assertTrue(amClient.blacklistAdditions.isEmpty());
+      assertTrue(amClient.blacklistRemovals.isEmpty());
+      
+      // create a invalid ContainerRequest - memory value is minus
+      ContainerRequest invalidContainerRequest = 
+          new ContainerRequest(Resource.newInstance(-1024, 1),
+              nodes, racks, priority);
+      amClient.addContainerRequest(invalidContainerRequest);
+      amClient.updateBlacklist(localNodeBlacklist, null);
+      try {
+        // allocate() should complain as ContainerRequest is invalid.
+        amClient.allocate(0.1f);
+        fail("there should be an exception here.");
+      } catch (Exception e) {
+        assertEquals(amClient.blacklistAdditions.size(), 1);
+      }
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientWithBlacklist() throws YarnException, IOException {
+    AMRMClientImpl<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient =
+          (AMRMClientImpl<ContainerRequest>) AMRMClient
+            .<ContainerRequest> createAMRMClient();
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      String[] nodes = {"node1", "node2", "node3"};
+      
+      // Add nodes[0] and nodes[1]
+      List<String> nodeList01 = new ArrayList<String>();
+      nodeList01.add(nodes[0]);
+      nodeList01.add(nodes[1]);
+      amClient.updateBlacklist(nodeList01, null);
+      assertEquals(amClient.blacklistAdditions.size(),2);
+      assertEquals(amClient.blacklistRemovals.size(),0);
+      
+      // Add nodes[0] again, verify it is not added duplicated.
+      List<String> nodeList02 = new ArrayList<String>();
+      nodeList02.add(nodes[0]);
+      nodeList02.add(nodes[2]);
+      amClient.updateBlacklist(nodeList02, null);
+      assertEquals(amClient.blacklistAdditions.size(),3);
+      assertEquals(amClient.blacklistRemovals.size(),0);
+      
+      // Add nodes[1] and nodes[2] to removal list, 
+      // Verify addition list remove these two nodes.
+      List<String> nodeList12 = new ArrayList<String>();
+      nodeList12.add(nodes[1]);
+      nodeList12.add(nodes[2]);
+      amClient.updateBlacklist(null, nodeList12);
+      assertEquals(amClient.blacklistAdditions.size(),1);
+      assertEquals(amClient.blacklistRemovals.size(),2);
+      
+      // Add nodes[1] again to addition list, 
+      // Verify removal list will remove this node.
+      List<String> nodeList1 = new ArrayList<String>();
+      nodeList1.add(nodes[1]);
+      amClient.updateBlacklist(nodeList1, null);
+      assertEquals(amClient.blacklistAdditions.size(),2);
+      assertEquals(amClient.blacklistRemovals.size(),1);
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+
+  private int getAllocatedContainersNumber(
+      AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
+      throws YarnException, IOException {
+    int allocatedContainerCount = 0;
+    while (iterationsLeft-- > 0) {
+      Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(amClient.release.size() == 0);
+        
+      assertTrue(nodeCount == amClient.getClusterNodeCount());
+      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+        
+      if(allocatedContainerCount == 0) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+    return allocatedContainerCount;
+  }
 
   @Test (timeout=60000)
   public void testAMRMClient() throws YarnException, IOException {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java Fri Aug 30 23:03:44 2013
@@ -125,7 +125,7 @@ public class ResourceBlacklistRequestPBI
 
   @Override
   public void setBlacklistAdditions(List<String> resourceNames) {
-    if (resourceNames == null) {
+    if (resourceNames == null || resourceNames.isEmpty()) {
       if (this.blacklistAdditions != null) {
         this.blacklistAdditions.clear();
       }
@@ -144,7 +144,7 @@ public class ResourceBlacklistRequestPBI
 
   @Override
   public void setBlacklistRemovals(List<String> resourceNames) {
-    if (resourceNames == null) {
+    if (resourceNames == null || resourceNames.isEmpty()) {
       if (this.blacklistRemovals != null) {
         this.blacklistRemovals.clear();
       }