You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2014/01/22 23:08:04 UTC

svn commit: r1560533 - in /hadoop/common/trunk/hadoop-yarn-project: CHANGES.txt hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Author: sandy
Date: Wed Jan 22 22:08:04 2014
New Revision: 1560533

URL: http://svn.apache.org/r1560533
Log:
YARN-1607. TestRM relies on the scheduler assigning multiple containers in a single node update (Sandy Ryza)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1560533&r1=1560532&r2=1560533&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jan 22 22:08:04 2014
@@ -357,6 +357,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1606. Fix the default value of yarn.resourcemanager.zk-timeout-ms 
     in yarn-default.xml (kasha)
 
+    YARN-1607. TestRM relies on the scheduler assigning multiple containers in
+    a single node update (Sandy Ryza)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1560533&r1=1560532&r2=1560533&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Jan 22 22:08:04 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -56,6 +59,9 @@ public class TestRM {
 
   private static final Log LOG = LogFactory.getLog(TestRM.class);
 
+  // Milliseconds to sleep for when waiting for something to happen
+  private final static int WAIT_SLEEP_MS = 100;
+  
   @Test
   public void testGetNewAppId() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -69,7 +75,7 @@ public class TestRM {
     rm.stop();
   }
   
-  @Test
+  @Test (timeout = 30000)
   public void testAppWithNoContainers() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
@@ -91,7 +97,7 @@ public class TestRM {
     rm.stop();
   }
 
-  @Test
+  @Test (timeout = 30000)
   public void testAppOnMultiNode() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
@@ -116,30 +122,30 @@ public class TestRM {
     am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
     
     //kick the scheduler
-    nm1.nodeHeartbeat(true);
     List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
     int contReceived = conts.size();
     while (contReceived < 3) {//only 3 containers are available on node1
+      nm1.nodeHeartbeat(true);
       conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers());
       contReceived = conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
-      Thread.sleep(2000);
+      Thread.sleep(WAIT_SLEEP_MS);
     }
     Assert.assertEquals(3, conts.size());
 
     //send node2 heartbeat
-    nm2.nodeHeartbeat(true);
     conts = am.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
     contReceived = conts.size();
     while (contReceived < 10) {
+      nm2.nodeHeartbeat(true);
       conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers());
       contReceived = conts.size();
       LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
-      Thread.sleep(2000);
+      Thread.sleep(WAIT_SLEEP_MS);
     }
     Assert.assertEquals(10, conts.size());
 
@@ -150,7 +156,7 @@ public class TestRM {
     rm.stop();
   }
   
-  @Test
+  @Test (timeout = 40000)
   public void testNMToken() throws Exception {
     MockRM rm = new MockRM();
     try {
@@ -187,19 +193,17 @@ public class TestRM {
       // initially requesting 2 containers.
       AllocateResponse response =
           am.allocate("h1", 1000, 2, releaseContainerList);
-      nm1.nodeHeartbeat(true);
       Assert.assertEquals(0, response.getAllocatedContainers().size());
       allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
-          nmTokens);
+          nmTokens, nm1);
       Assert.assertEquals(1, nmTokens.size());
 
       
       // requesting 2 more containers.
       response = am.allocate("h1", 1000, 2, releaseContainerList);
-      nm1.nodeHeartbeat(true);
       Assert.assertEquals(0, response.getAllocatedContainers().size());
       allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
-          nmTokens);
+          nmTokens, nm1);
       Assert.assertEquals(1, nmTokens.size());
       
       
@@ -211,23 +215,27 @@ public class TestRM {
           new ArrayList<Container>();
       
       response = am.allocate("h2", 1000, 2, releaseContainerList);
-      nm2.nodeHeartbeat(true);
       Assert.assertEquals(0, response.getAllocatedContainers().size());
       allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
-          nmTokens);
+          nmTokens, nm2);
       Assert.assertEquals(2, nmTokens.size());
       
       // Simulating NM-2 restart.
       nm2 = rm.registerNode("h2:1234", 10000);
-      nm2.nodeHeartbeat(true);
-      
+      // Wait for reconnect to make it through the RM and create a new RMNode
+      Map<NodeId, RMNode> nodes = rm.getRMContext().getRMNodes();
+      while (nodes.get(nm2.getNodeId()).getLastNodeHeartBeatResponse()
+          .getResponseId() > 0) {
+        Thread.sleep(WAIT_SLEEP_MS);
+      }
+
       int interval = 40;
       // Wait for nm Token to be cleared.
       while (nmTokenSecretManager
           .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
               nm2.getNodeId()) && interval-- > 0) {
         LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
-        Thread.sleep(1000);
+        Thread.sleep(WAIT_SLEEP_MS);
       }
       Assert.assertTrue(nmTokenSecretManager
           .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -238,10 +246,9 @@ public class TestRM {
       
       // We should again receive the NMToken.
       response = am.allocate("h2", 1000, 2, releaseContainerList);
-      nm2.nodeHeartbeat(true);
       Assert.assertEquals(0, response.getAllocatedContainers().size());
       allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
-          nmTokens);
+          nmTokens, nm2);
       Assert.assertEquals(2, nmTokens.size());
 
       // Now rolling over NMToken masterKey. it should resend the NMToken in
@@ -270,10 +277,9 @@ public class TestRM {
       Assert.assertEquals(0, nmTokens.size());
       // We should again receive the NMToken.
       response = am.allocate("h2", 1000, 1, releaseContainerList);
-      nm2.nodeHeartbeat(true);
       Assert.assertEquals(0, response.getAllocatedContainers().size());
       allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
-          nmTokens);
+          nmTokens, nm2);
       Assert.assertEquals(1, nmTokens.size());
       Assert.assertTrue(nmTokenSecretManager
           .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
@@ -305,12 +311,14 @@ public class TestRM {
 
   protected void allocateContainersAndValidateNMTokens(MockAM am,
       ArrayList<Container> containersReceived, int totalContainerRequested,
-      HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
+      HashMap<String, Token> nmTokens, MockNM nm) throws Exception,
+      InterruptedException {
     ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
     AllocateResponse response;
     ArrayList<ResourceRequest> resourceRequest =
         new ArrayList<ResourceRequest>();      
     while (containersReceived.size() < totalContainerRequested) {
+      nm.nodeHeartbeat(true);
       LOG.info("requesting containers..");
       response =
           am.allocate(resourceRequest, releaseContainerList);
@@ -326,7 +334,7 @@ public class TestRM {
       }
       LOG.info("Got " + containersReceived.size()
           + " containers. Waiting to get " + totalContainerRequested);
-      Thread.sleep(500);
+      Thread.sleep(WAIT_SLEEP_MS);
     }
   }