You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/08/21 07:22:21 UTC

svn commit: r1619293 [2/2] - in /hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/...

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Thu Aug 21 05:22:10 2014
@@ -171,7 +171,6 @@ public class TestApplicationMasterServic
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.setAMRMProtocol(rm.getApplicationMasterService());
 
     AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
     List<ContainerId> release = new ArrayList<ContainerId>();

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Thu Aug 21 05:22:10 2014
@@ -331,6 +331,10 @@ public class TestRMHA {
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
+    
+    MyCountingDispatcher dispatcher =
+        (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    assertTrue(!dispatcher.isStopped());
 
     rm.adminService.transitionToActive(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
@@ -339,6 +343,11 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
+    
+    // Keep the dispatcher reference before transitioning to standby
+    dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    
+    
     rm.adminService.transitionToStandby(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@@ -346,6 +355,8 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
+    assertTrue(dispatcher.isStopped());
+    
     rm.stop();
   }
 
@@ -492,6 +503,8 @@ public class TestRMHA {
 
     private int eventHandlerCount;
 
+    private volatile boolean stopped = false;
+
     public MyCountingDispatcher() {
       super("MyCountingDispatcher");
       this.eventHandlerCount = 0;
@@ -510,5 +523,15 @@ public class TestRMHA {
     public int getEventHandlerCount() {
       return this.eventHandlerCount;
     }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      this.stopped = true;
+      super.serviceStop();
+    }
+
+    public boolean isStopped() {
+      return this.stopped;
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Aug 21 05:22:10 2014
@@ -289,7 +289,7 @@ public class TestRMRestart {
     
     // verify old AM is not accepted
     // change running AM to talk to new RM
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
@@ -1663,7 +1663,7 @@ public class TestRMRestart {
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     // recover app
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Thu Aug 21 05:22:10 2014
@@ -33,10 +33,13 @@ import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -72,6 +75,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import com.google.common.base.Supplier;
+
+
 @SuppressWarnings({"rawtypes", "unchecked"})
 @RunWith(value = Parameterized.class)
 public class TestWorkPreservingRMRestart {
@@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart
     rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
 
-    am0.setAMRMProtocol(rm2.getApplicationMasterService());
-    am0.registerAppAttempt(false);
+    am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am0.registerAppAttempt(true);
 
     rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart
     waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
   }
 
+  // Test if RM on recovery receives the container release request from AM
+  // before it receives the container status reported by NM for recovery. this
+  // container should not be recovered.
+  @Test (timeout = 30000)
+  public void testReleasedContainerNotRecovered() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    rm1.start();
+
+    RMApp app1 = rm1.submitApp(1024);
+    final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Re-start RM
+    conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am1.registerAppAttempt(true);
+
+    // try to release a container before the container is actually recovered.
+    final ContainerId runningContainer =
+        ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+    am1.allocate(null, Arrays.asList(runningContainer));
+
+    // send container statuses to recover the containers
+    List<NMContainerStatus> containerStatuses =
+        createNMContainerStatusForApp(am1);
+    nm1.registerNode(containerStatuses, null);
+
+    // only the am container should be recovered.
+    waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
+
+    final AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    // cached release request is cleaned.
+    // assertFalse(scheduler.getPendingRelease().contains(runningContainer));
+
+    AllocateResponse response = am1.allocate(null, null);
+    // AM gets notified of the completed container.
+    boolean receivedCompletedContainer = false;
+    for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+      if (status.getContainerId().equals(runningContainer)) {
+        receivedCompletedContainer = true;
+      }
+    }
+    assertTrue(receivedCompletedContainer);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        // release cache is cleaned up and previous running container is not
+        // recovered
+        return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
+          .getPendingRelease().isEmpty()
+            && scheduler.getRMContainer(runningContainer) == null;
+      }
+    }, 1000, 20000);
+  }
+
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,
@@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart
     assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
   }
 
-  private void waitForNumContainersToRecover(int num, MockRM rm,
+  public static void waitForNumContainersToRecover(int num, MockRM rm,
       ApplicationAttemptId attemptId) throws Exception {
     AbstractYarnScheduler scheduler =
         (AbstractYarnScheduler) rm.getResourceScheduler();
@@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart
       attempt = scheduler.getApplicationAttempt(attemptId);
     }
     while (attempt.getLiveContainers().size() < num) {
-      System.out.println("Wait for " + num + " containers to recover.");
+      System.out.println("Wait for " + num
+          + " containers to recover. currently: "
+          + attempt.getLiveContainers().size());
       Thread.sleep(200);
     }
   }