You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [19/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Tue Aug 19 23:49:39 2014
@@ -18,26 +18,34 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -45,13 +53,29 @@ import org.apache.hadoop.yarn.server.uti
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestApplicationCleanup {
 
   private static final Log LOG = LogFactory
     .getLog(TestApplicationCleanup.class);
+  
+  private YarnConfiguration conf;
+  
+  @Before
+  public void setup() throws UnknownHostException {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
+  }
 
+  @SuppressWarnings("resource")
   @Test
   public void testAppCleanup() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -130,6 +154,7 @@ public class TestApplicationCleanup {
     rm.stop();
   }
 
+  @SuppressWarnings("resource")
   @Test
   public void testContainerCleanup() throws Exception {
 
@@ -207,20 +232,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanup();
-    int cleanedConts = contsToClean.size();
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
-      dispatcher.await();
-      contsToClean = resp.getContainersToCleanup();
-      cleanedConts += contsToClean.size();
-    }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
-    Assert.assertEquals(1, cleanedConts);
+    waitForContainerCleanup(dispatcher, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -233,24 +245,237 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    contsToClean = resp.getContainersToCleanup();
-    cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
+    waitForContainerCleanup(dispatcher, nm1, resp);
+
+    rm.stop();
+  }
+
+  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+      NodeHeartbeatResponse resp) throws Exception {
+    int waitCount = 0, cleanedConts = 0;
+    List<ContainerId> contsToClean;
+    do {
       dispatcher.await();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
+      if (cleanedConts >= 1) {
+        break;
+      }
+      Thread.sleep(100);
+      resp = nm.nodeHeartbeat(true);
+    } while(waitCount++ < 200);
+
+    if (contsToClean.isEmpty()) {
+      LOG.error("Failed to get any containers to cleanup");
+    } else {
+      LOG.info("Got cleanup for " + contsToClean.get(0));
     }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
     Assert.assertEquals(1, cleanedConts);
+  }
 
-    rm.stop();
+  private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
+      throws Exception {
+    while (true) {
+      NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
+      if (response.getApplicationsToCleanup() != null
+          && response.getApplicationsToCleanup().size() == 1
+          && appId.equals(response.getApplicationsToCleanup().get(0))) {
+        return;
+      }
+
+      LOG.info("Haven't got application=" + appId.toString()
+          + " in cleanup list from node heartbeat response, "
+          + "sleep for a while before next heartbeat");
+      Thread.sleep(1000);
+    }
+  }
+  
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+  
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+    // start new RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+    
+    // wait for application cleanup message received
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+    
+    rm1.stop();
+    rm2.stop();
+  }
+  
+  @SuppressWarnings("resource")
+  @Test(timeout = 60000)
+  public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    MockNM nm2 =
+        new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService());
+    nm2.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // alloc another container on nm2
+    AllocateResponse allocResponse =
+        am0.allocate(Arrays.asList(ResourceRequest.newInstance(
+            Priority.newInstance(1), "*", Resource.newInstance(1024, 0), 1)),
+            null);
+    while (null == allocResponse.getAllocatedContainers()
+        || allocResponse.getAllocatedContainers().isEmpty()) {
+      nm2.nodeHeartbeat(true);
+      allocResponse = am0.allocate(null, null);
+      Thread.sleep(1000);
+    }
+
+    // start new RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    // nm1/nm2 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance(
+      ContainerId.newInstance(am0.getApplicationAttemptId(), 1),
+      ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0,
+      Priority.newInstance(0), 1234)), Arrays.asList(app0.getApplicationId()));
+    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm2.registerNode(Arrays.asList(app0.getApplicationId()));
+
+    // assert app state has been saved.
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+    // wait for application cleanup message received on NM1
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+    // wait for application cleanup message received on NM2
+    waitForAppCleanupMessageRecved(nm2, app0.getApplicationId());
+
+    rm1.stop();
+    rm2.stop();
+  }
+
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+      Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // start new RM
+    final DrainDispatcher dispatcher2 = new DrainDispatcher();
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher2;
+      }
+    };
+    rm2.start();
+
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Add unknown container for application unknown to scheduler
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+        .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+    waitForContainerCleanup(dispatcher2, nm1, response);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
+  @Test (timeout = 60000)
+  public void testAppCleanupWhenNMReconnects() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+    // wait for application cleanup message received
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+    // reconnect NM with application still active
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+    rm1.stop();
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -194,28 +195,17 @@ public class TestApplicationMasterLaunch
 
     // request for containers
     int request = 2;
-    try {
-      AllocateResponse ar =
-          am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
+    AllocateResponse ar =
+        am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     // kick the scheduler
     nm1.nodeHeartbeat(true);
-    try {
-      AllocateResponse amrs =
-          am.allocate(new ArrayList<ResourceRequest>(),
-            new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
-    Assert.assertTrue(thrown);
+    AllocateResponse amrs =
+        am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     am.registerAppAttempt();
     thrown = false;
     try {
@@ -228,5 +218,17 @@ public class TestApplicationMasterLaunch
       thrown = true;
     }
     Assert.assertTrue(thrown);
+
+    // Simulate an AM that was disconnected and app attempt was removed
+    // (responseMap does not contain attemptid)
+    am.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+        ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+
+    AllocateResponse amrs2 =
+        am.allocate(new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>());
+    Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 23:49:39 2014
@@ -18,60 +18,33 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import com.google.common.collect.Maps;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 import static java.lang.Thread.sleep;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
 
 public class TestApplicationMasterService {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -198,7 +171,6 @@ public class TestApplicationMasterServic
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.setAMRMProtocol(rm.getApplicationMasterService());
 
     AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
     List<ContainerId> release = new ArrayList<ContainerId>();
@@ -270,13 +242,17 @@ public class TestApplicationMasterServic
       }
       Assert.assertNotNull(cause);
       Assert
-          .assertTrue(cause instanceof InvalidApplicationMasterRequestException);
+          .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
       Assert.assertNotNull(cause.getMessage());
       Assert
           .assertTrue(cause
               .getMessage()
               .contains(
                   "Application Master is trying to unregister before registering for:"));
+
+      am1.registerAppAttempt();
+
+      am1.unregisterAppAttempt(req, false);
     } finally {
       if (rm != null) {
         rm.stop();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Tue Aug 19 23:49:39 2014
@@ -44,16 +44,17 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CyclicBarrier;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -137,6 +139,10 @@ public class TestClientRMService {
   
   private final static String QUEUE_1 = "Q-1";
   private final static String QUEUE_2 = "Q-2";
+  private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT";
+  static {
+    KerberosName.setRules(kerberosRule);
+  }
   
   @BeforeClass
   public static void setupSecretManager() throws IOException {
@@ -259,6 +265,28 @@ public class TestClientRMService {
   }
 
   @Test
+  public void testGetApplicationResourceUsageReportDummy() throws YarnException,
+      IOException {
+    ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+    YarnScheduler yarnScheduler = mockYarnScheduler();
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+        new EventHandler<Event>() {
+          public void handle(Event event) {
+          }
+        });
+    ApplicationSubmissionContext asContext = 
+        mock(ApplicationSubmissionContext.class);
+    YarnConfiguration config = new YarnConfiguration();
+    RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
+        rmContext, yarnScheduler, null, asContext, config, false);
+    ApplicationResourceUsageReport report = rmAppAttemptImpl
+        .getApplicationResourceUsageReport();
+    assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
+  }
+
+  @Test
   public void testGetApplicationAttempts() throws YarnException, IOException {
     ClientRMService rmService = createRMService();
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -456,6 +484,17 @@ public class TestClientRMService {
       UserGroupInformation.createRemoteUser("owner");
   private static final UserGroupInformation other =
       UserGroupInformation.createRemoteUser("other");
+  private static final UserGroupInformation tester =
+      UserGroupInformation.createRemoteUser("tester");
+  private static final String testerPrincipal = "tester@EXAMPLE.COM";
+  private static final String ownerPrincipal = "owner@EXAMPLE.COM";
+  private static final String otherPrincipal = "other@EXAMPLE.COM";
+  private static final UserGroupInformation testerKerb =
+      UserGroupInformation.createRemoteUser(testerPrincipal);
+  private static final UserGroupInformation ownerKerb =
+      UserGroupInformation.createRemoteUser(ownerPrincipal);
+  private static final UserGroupInformation otherKerb =
+      UserGroupInformation.createRemoteUser(otherPrincipal);
   
   @Test
   public void testTokenRenewalByOwner() throws Exception {
@@ -478,9 +517,8 @@ public class TestClientRMService {
             checkTokenRenewal(owner, other);
             return null;
           } catch (YarnException ex) {
-            Assert.assertTrue(ex.getMessage().contains(
-                "Client " + owner.getUserName() +
-                " tries to renew a token with renewer specified as " +
+            Assert.assertTrue(ex.getMessage().contains(owner.getUserName() +
+                " tries to renew a token with renewer " +
                 other.getUserName()));
             throw ex;
           }
@@ -524,6 +562,147 @@ public class TestClientRMService {
     rmService.renewDelegationToken(request);
   }
 
+  @Test
+  public void testTokenCancellationByOwner() throws Exception {
+    // two tests required - one with a kerberos name
+    // and with a short name
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(rmService, testerKerb, other);
+        return null;
+      }
+    });
+    owner.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(owner, other);
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testTokenCancellationByRenewer() throws Exception {
+    // two tests required - one with a kerberos name
+    // and with a short name
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(rmService, owner, testerKerb);
+        return null;
+      }
+    });
+    other.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        checkTokenCancellation(owner, other);
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testTokenCancellationByWrongUser() {
+    // two sets to test -
+    // 1. try to cancel tokens of short and kerberos users as a kerberos UGI
+    // 2. try to cancel tokens of short and kerberos users as a simple auth UGI
+
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    UserGroupInformation[] kerbTestOwners =
+        { owner, other, tester, ownerKerb, otherKerb };
+    UserGroupInformation[] kerbTestRenewers =
+        { owner, other, ownerKerb, otherKerb };
+    for (final UserGroupInformation tokOwner : kerbTestOwners) {
+      for (final UserGroupInformation tokRenewer : kerbTestRenewers) {
+        try {
+          testerKerb.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              try {
+                checkTokenCancellation(rmService, tokOwner, tokRenewer);
+                Assert.fail("We should not reach here; token owner = "
+                    + tokOwner.getUserName() + ", renewer = "
+                    + tokRenewer.getUserName());
+                return null;
+              } catch (YarnException e) {
+                Assert.assertTrue(e.getMessage().contains(
+                  testerKerb.getUserName()
+                      + " is not authorized to cancel the token"));
+                return null;
+              }
+            }
+          });
+        } catch (Exception e) {
+          Assert.fail("Unexpected exception; " + e.getMessage());
+        }
+      }
+    }
+
+    UserGroupInformation[] simpleTestOwners =
+        { owner, other, ownerKerb, otherKerb, testerKerb };
+    UserGroupInformation[] simpleTestRenewers =
+        { owner, other, ownerKerb, otherKerb };
+    for (final UserGroupInformation tokOwner : simpleTestOwners) {
+      for (final UserGroupInformation tokRenewer : simpleTestRenewers) {
+        try {
+          tester.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              try {
+                checkTokenCancellation(tokOwner, tokRenewer);
+                Assert.fail("We should not reach here; token owner = "
+                    + tokOwner.getUserName() + ", renewer = "
+                    + tokRenewer.getUserName());
+                return null;
+              } catch (YarnException ex) {
+                Assert.assertTrue(ex.getMessage().contains(
+                  tester.getUserName()
+                      + " is not authorized to cancel the token"));
+                return null;
+              }
+            }
+          });
+        } catch (Exception e) {
+          Assert.fail("Unexpected exception; " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void checkTokenCancellation(UserGroupInformation owner,
+      UserGroupInformation renewer) throws IOException, YarnException {
+    RMContext rmContext = mock(RMContext.class);
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, null, null, null, null, dtsm);
+    checkTokenCancellation(rmService, owner, renewer);
+  }
+
+  private void checkTokenCancellation(ClientRMService rmService,
+      UserGroupInformation owner, UserGroupInformation renewer)
+      throws IOException, YarnException {
+    RMDelegationTokenIdentifier tokenIdentifier =
+        new RMDelegationTokenIdentifier(new Text(owner.getUserName()),
+          new Text(renewer.getUserName()), null);
+    Token<?> token =
+        new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm);
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    CancelDelegationTokenRequest request =
+        Records.newRecord(CancelDelegationTokenRequest.class);
+    request.setDelegationToken(dToken);
+    rmService.cancelDelegationToken(request);
+  }
+
   @Test (timeout = 30000)
   @SuppressWarnings ("rawtypes")
   public void testAppSubmit() throws Exception {
@@ -647,7 +826,8 @@ public class TestClientRMService {
     ApplicationId[] appIds =
         {getApplicationId(101), getApplicationId(102), getApplicationId(103)};
     List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+    
+    long[] submitTimeMillis = new long[3];
     // Submit applications
     for (int i = 0; i < appIds.length; i++) {
       ApplicationId appId = appIds[i];
@@ -657,6 +837,7 @@ public class TestClientRMService {
           appId, appNames[i], queues[i % queues.length],
           new HashSet<String>(tags.subList(0, i + 1)));
       rmService.submitApplication(submitRequest);
+      submitTimeMillis[i] = System.currentTimeMillis();
     }
 
     // Test different cases of ClientRMService#getApplications()
@@ -668,6 +849,24 @@ public class TestClientRMService {
     request.setLimit(1L);
     assertEquals("Failed to limit applications", 1,
         rmService.getApplications(request).getApplicationList().size());
+    
+    // Check start range
+    request = GetApplicationsRequest.newInstance();
+    request.setStartRange(submitTimeMillis[0], System.currentTimeMillis());
+    
+    // 2 applications are submitted after first timeMills
+    assertEquals("Incorrect number of matching start range", 
+        2, rmService.getApplications(request).getApplicationList().size());
+    
+    // 1 application is submitted after the second timeMills
+    request.setStartRange(submitTimeMillis[1], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        1, rmService.getApplications(request).getApplicationList().size());
+    
+    // no application is submitted after the third timeMills
+    request.setStartRange(submitTimeMillis[2], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        0, rmService.getApplications(request).getApplicationList().size());
 
     // Check queue
     request = GetApplicationsRequest.newInstance();
@@ -945,6 +1144,8 @@ public class TestClientRMService {
         Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
     when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
         Arrays.asList(getApplicationAttemptId(103)));
+    ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+    when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     return yarnScheduler;
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -235,8 +236,8 @@ public class TestClientRMTokens {
   @Test
   public void testShortCircuitRenewCancel()
       throws IOException, InterruptedException {
-    InetSocketAddress addr =
-        new InetSocketAddress(InetAddress.getLocalHost(), 123);
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        InetAddress.getLocalHost().getHostName(), 123, null);
     checkShortCircuitRenewCancel(addr, addr, true);
   }
 
@@ -244,17 +245,19 @@ public class TestClientRMTokens {
   public void testShortCircuitRenewCancelWildcardAddress()
       throws IOException, InterruptedException {
     InetSocketAddress rmAddr = new InetSocketAddress(123);
+    InetSocketAddress serviceAddr = NetUtils.createSocketAddr(
+        InetAddress.getLocalHost().getHostName(), rmAddr.getPort(), null);
     checkShortCircuitRenewCancel(
         rmAddr,
-        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        serviceAddr,
         true);
   }
 
   @Test
   public void testShortCircuitRenewCancelSameHostDifferentPort()
       throws IOException, InterruptedException {
-    InetSocketAddress rmAddr =
-        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+        InetAddress.getLocalHost().getHostName(), 123, null);
     checkShortCircuitRenewCancel(
         rmAddr,
         new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
@@ -264,8 +267,8 @@ public class TestClientRMTokens {
   @Test
   public void testShortCircuitRenewCancelDifferentHostSamePort()
       throws IOException, InterruptedException {
-    InetSocketAddress rmAddr =
-        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+        InetAddress.getLocalHost().getHostName(), 123, null);
     checkShortCircuitRenewCancel(
         rmAddr,
         new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
@@ -275,8 +278,8 @@ public class TestClientRMTokens {
   @Test
   public void testShortCircuitRenewCancelDifferentHostDifferentPort()
       throws IOException, InterruptedException {
-    InetSocketAddress rmAddr =
-        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    InetSocketAddress rmAddr = NetUtils.createSocketAddr(
+        InetAddress.getLocalHost().getHostName(), 123, null);
     checkShortCircuitRenewCancel(
         rmAddr,
         new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -52,21 +53,21 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class TestFifoScheduler {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
   
   private final int GB = 1024;
   private static YarnConfiguration conf;
-  
+
   @BeforeClass
   public static void setup() {
     conf = new YarnConfiguration();
@@ -76,12 +77,12 @@ public class TestFifoScheduler {
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FifoScheduler();
+    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -213,6 +214,35 @@ public class TestFifoScheduler {
     rm.stop();
   }
 
+  @Test
+  public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
+    FifoScheduler scheduler = new FifoScheduler();
+    MockRM rm = new MockRM(conf);
+    scheduler.setRMContext(rm.getRMContext());
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, rm.getRMContext());
+
+    RMNode node = MockNodes.newNodeInfo(1,
+            Resources.createResource(1024, 4), 1, "127.0.0.1");
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    scheduler.addApplication(appId, "queue1", "user1", false);
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    try {
+      scheduler.handle(updateEvent);
+    } catch (NullPointerException e) {
+        Assert.fail();
+    }
+
+    ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
+    scheduler.addApplicationAttempt(attId, false, false);
+
+    rm.stop();
+  }
+
   private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
       throws Exception {
     MockRM rm = new MockRM(conf);
@@ -266,7 +296,12 @@ public class TestFifoScheduler {
     conf.setQueues("default", new String[] {"default"});
     conf.setCapacity("default", 100);
     FifoScheduler fs = new FifoScheduler();
+    fs.init(conf);
+    fs.start();
+    // mock rmContext to avoid NPE.
+    RMContext context = mock(RMContext.class);
     fs.reinitialize(conf, null);
+    fs.setRMContext(context);
 
     RMNode n1 =
         MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
@@ -286,6 +321,7 @@ public class TestFifoScheduler {
     fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+    fs.stop();
   }
   
   @Test (timeout = 50000)

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Tue Aug 19 23:49:39 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
 public class TestMoveApplication {
   private ResourceManager resourceManager = null;
   private static boolean failMove;
-  
+  private Configuration conf;
+
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
         FifoSchedulerWithMove.class);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
     }
   }
   
-  @Test (timeout = 5000)
-  public void testMoveSuccessful() throws Exception {
-    // Submit application
-    Application application = new Application("user1", resourceManager);
-    ApplicationId appId = application.getApplicationId();
-    application.submit();
-    
-    // Wait for app to be accepted
-    RMApp app = resourceManager.rmContext.getRMApps().get(appId);
-    while (app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(100);
-    }
-
-    ClientRMService clientRMService = resourceManager.getClientRMService();
+  @Test (timeout = 10000)
+      public
+      void testMoveSuccessful() throws Exception {
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app = rm1.submitApp(1024);
+    ClientRMService clientRMService = rm1.getClientRMService();
     // FIFO scheduler does not support moves
-    clientRMService.moveApplicationAcrossQueues(
-        MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-    
-    RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+    clientRMService
+      .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+        .newInstance(app.getApplicationId(), "newqueue"));
+
+    RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
     assertEquals("newqueue", rmApp.getQueue());
+    rm1.stop();
   }
-  
+
   @Test
   public void testMoveRejectedByPermissions() throws Exception {
     failMove = true;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.junit.After;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -78,7 +81,14 @@ public class TestRM {
 
   // Milliseconds to sleep for when waiting for something to happen
   private final static int WAIT_SLEEP_MS = 100;
-  
+
+  @After
+  public void tearDown() {
+    ClusterMetrics.destroy();
+    QueueMetrics.clearQueueMetrics();
+    DefaultMetricsSystem.shutdown();
+  }
+
   @Test
   public void testGetNewAppId() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java Tue Aug 19 23:49:39 2014
@@ -348,14 +348,14 @@ public class TestRMAdminService {
 
     rm.adminService.refreshSuperUserGroupsConfiguration(
         RefreshSuperUserGroupsConfigurationRequest.newInstance());
-    Assert.assertTrue(ProxyUsers.getProxyGroups()
+    Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
         .get("hadoop.proxyuser.test.groups").size() == 1);
-    Assert.assertTrue(ProxyUsers.getProxyGroups()
+    Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
         .get("hadoop.proxyuser.test.groups").contains("test_groups"));
 
-    Assert.assertTrue(ProxyUsers.getProxyHosts()
+    Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
         .get("hadoop.proxyuser.test.hosts").size() == 1);
-    Assert.assertTrue(ProxyUsers.getProxyHosts()
+    Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
         .get("hadoop.proxyuser.test.hosts").contains("test_hosts"));
   }
 
@@ -708,14 +708,14 @@ public class TestRMAdminService {
           aclsString);
 
       // verify ProxyUsers and ProxyHosts
-      Assert.assertTrue(ProxyUsers.getProxyGroups()
+      Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
           .get("hadoop.proxyuser.test.groups").size() == 1);
-      Assert.assertTrue(ProxyUsers.getProxyGroups()
+      Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups()
           .get("hadoop.proxyuser.test.groups").contains("test_groups"));
 
-      Assert.assertTrue(ProxyUsers.getProxyHosts()
+      Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
           .get("hadoop.proxyuser.test.hosts").size() == 1);
-      Assert.assertTrue(ProxyUsers.getProxyHosts()
+      Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts()
           .get("hadoop.proxyuser.test.hosts").contains("test_hosts"));
 
       // verify UserToGroupsMappings

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Tue Aug 19 23:49:39 2014
@@ -28,7 +28,7 @@ import java.net.InetSocketAddress;
 
 import javax.ws.rs.core.MediaType;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +37,8 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.service.AbstractService;
@@ -92,6 +94,9 @@ public class TestRMHA {
     // Enable webapp to test web-services also
     configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
     configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+    ClusterMetrics.destroy();
+    QueueMetrics.clearQueueMetrics();
+    DefaultMetricsSystem.shutdown();
   }
 
   private void checkMonitorHealth() throws IOException {
@@ -326,6 +331,10 @@ public class TestRMHA {
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
+    
+    MyCountingDispatcher dispatcher =
+        (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    assertTrue(!dispatcher.isStopped());
 
     rm.adminService.transitionToActive(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
@@ -334,6 +343,11 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
+    
+    // Keep the dispatcher reference before transitioning to standby
+    dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    
+    
     rm.adminService.transitionToStandby(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@@ -341,6 +355,8 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
+    assertTrue(dispatcher.isStopped());
+    
     rm.stop();
   }
 
@@ -375,7 +391,19 @@ public class TestRMHA {
   }
 
   @Test
-  public void testHAWithRMHostName() {
+  public void testHAWithRMHostName() throws Exception {
+    innerTestHAWithRMHostName(false);
+    configuration.clear();
+    setUp();
+    innerTestHAWithRMHostName(true);
+  }
+
+  public void innerTestHAWithRMHostName(boolean includeBindHost) {
+    //this is run two times, with and without a bind host configured
+    if (includeBindHost) {
+      configuration.set(YarnConfiguration.RM_BIND_HOST, "9.9.9.9");
+    }
+
     //test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set
     //We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration
     configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
@@ -395,6 +423,15 @@ public class TestRMHA {
             RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID)));
         assertEquals("RPC address not set for " + confKey,
             RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID)));
+        if (includeBindHost) {
+          assertEquals("Web address misconfigured WITH bind-host",
+                       rm.webAppAddress.substring(0, 7), "9.9.9.9");
+        } else {
+          //YarnConfiguration tries to figure out which rm host it's on by binding to it,
+          //which doesn't happen for any of these fake addresses, so we end up with 0.0.0.0
+          assertEquals("Web address misconfigured WITHOUT bind-host",
+                       rm.webAppAddress.substring(0, 7), "0.0.0.0");
+        }
       }
     } catch (YarnRuntimeException e) {
       fail("Should not throw any exceptions.");
@@ -466,6 +503,8 @@ public class TestRMHA {
 
     private int eventHandlerCount;
 
+    private volatile boolean stopped = false;
+
     public MyCountingDispatcher() {
       super("MyCountingDispatcher");
       this.eventHandlerCount = 0;
@@ -484,5 +523,15 @@ public class TestRMHA {
     public int getEventHandlerCount() {
       return this.eventHandlerCount;
     }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      this.stopped = true;
+      super.serviceStop();
+    }
+
+    public boolean isStopped() {
+      return this.stopped;
+    }
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Tue Aug 19 23:49:39 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
-    node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null, null));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null, null));
     
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
-    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node2.handle(new RMNodeStartedEvent(null, null, null));
     
     ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testStatusChange(){
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null, null));
     //Add info to the queue first
     node.setNextHeartBeat(false);
 
@@ -455,12 +456,16 @@ public class TestRMNodeTransitions {
   }
 
   private RMNodeImpl getRunningNode() {
+    return getRunningNode(null);
+  }
+
+  private RMNodeImpl getRunningNode(String nmVersion) {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
         null, ResourceOption.newInstance(capability,
-            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null);
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
   }
@@ -491,7 +496,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
     Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -515,7 +520,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
     Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -530,4 +535,15 @@ public class TestRMNodeTransitions {
         nodesListManagerEvent.getType());
   }
 
+  @Test
+  public void testReconnnectUpdate() {
+    final String nmVersion1 = "nm version 1";
+    final String nmVersion2 = "nm version 2";
+    RMNodeImpl node = getRunningNode(nmVersion1);
+    Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
+    RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
+        null));
+    Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,13 +68,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -82,8 +85,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -100,7 +103,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestRMRestart {
-
   private final static File TEMP_DIR = new File(System.getProperty(
     "test.build.data", "/tmp"), "decommision");
   private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@@ -287,11 +289,11 @@ public class TestRMRestart {
     
     // verify old AM is not accepted
     // change running AM to talk to new RM
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+    Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -303,13 +305,11 @@ public class TestRMRestart {
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
     nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
 
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status), null);
     nm2.registerNode();
     
     rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -392,7 +392,7 @@ public class TestRMRestart {
     // completed apps are not removed immediately after app finish
     // And finished app is also loaded back.
     Assert.assertEquals(4, rmAppState.size());
- }
+  }
 
   @Test (timeout = 60000)
   public void testRMRestartAppRunningAMFailed() throws Exception {
@@ -510,14 +510,11 @@ public class TestRMRestart {
     Assert.assertEquals(RMAppAttemptState.LAUNCHED,
         rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
             .getAppAttemptState());
-    
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(
-            BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart.createNMContainerStatus(
+          am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status), null);
     rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     launchAM(rmApp, rm2, nm1);
     Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -615,7 +612,7 @@ public class TestRMRestart {
 
       @Override
       public void updateApplicationStateInternal(ApplicationId appId,
-          ApplicationStateDataPBImpl appStateData) throws Exception {
+          ApplicationStateData appStateData) throws Exception {
         if (count == 0) {
           // do nothing; simulate app final state is not saved.
           LOG.info(appId + " final state is not saved.");
@@ -763,14 +760,14 @@ public class TestRMRestart {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
     };
@@ -1211,18 +1208,13 @@ public class TestRMRestart {
     Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
       attemptState.getMasterContainer().getId());
 
-    // the appToken and clientTokenMasterKey that are generated when
+    // the clientTokenMasterKey that are generated when
     // RMAppAttempt is created,
-    HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
-    tokenSet.add(attempt1.getAMRMToken());
     byte[] clientTokenMasterKey =
         attempt1.getClientTokenMasterKey().getEncoded();
 
     // assert application credentials are saved
     Credentials savedCredentials = attemptState.getAppAttemptCredentials();
-    HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
-    savedTokens.addAll(savedCredentials.getAllTokens());
-    Assert.assertEquals(tokenSet, savedTokens);
     Assert.assertArrayEquals("client token master key not saved",
         clientTokenMasterKey, savedCredentials.getSecretKey(
             RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
@@ -1235,11 +1227,8 @@ public class TestRMRestart {
         rm2.getRMContext().getRMApps().get(app1.getApplicationId());
     RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
 
-    // assert loaded attempt recovered attempt tokens
+    // assert loaded attempt recovered
     Assert.assertNotNull(loadedAttempt1);
-    savedTokens.clear();
-    savedTokens.add(loadedAttempt1.getAMRMToken());
-    Assert.assertEquals(tokenSet, savedTokens);
 
     // assert client token master key is recovered back to api-versioned
     // client token master key
@@ -1638,14 +1627,20 @@ public class TestRMRestart {
 
     // create app that gets launched and does allocate before RM restart
     RMApp app1 = rm1.submitApp(200);
-    assertQueueMetrics(qm1, 1, 1, 0, 0);
-    nm1.nodeHeartbeat(true);
+    // Need to wait first for AppAttempt to be started (RMAppState.ACCEPTED)
+    // and then for it to reach RMAppAttemptState.SCHEDULED
+    // inorder to ensure appsPending metric is incremented
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+    rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED);
+    assertQueueMetrics(qm1, 1, 1, 0, 0);
+
+    nm1.nodeHeartbeat(true);
     rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
     MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
@@ -1660,24 +1655,25 @@ public class TestRMRestart {
     // PHASE 2: create new RM and start from old state
     // create new RM to represent restart and recover state
     MockRM rm2 = new MockRM(conf, memStore);
-    rm2.start();
-    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
     resetQueueMetrics(qm2);
     assertQueueMetrics(qm2, 0, 0, 0, 0);
+
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     // recover app
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
     am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status), null);
+
     while (loadedApp1.getAppAttempts().size() != 2) {
       Thread.sleep(200);
     }
@@ -1801,12 +1797,10 @@ public class TestRMRestart {
             // ResourceTrackerService is started.
             super.serviceStart();
             nm1.setResourceTrackerService(getResourceTrackerService());
-            List<ContainerStatus> status = new ArrayList<ContainerStatus>();
-            ContainerId amContainer =
-                ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
-            status.add(ContainerStatus.newInstance(amContainer,
-              ContainerState.COMPLETE, "AM container exit", 143));
-            nm1.registerNode(status);
+            NMContainerStatus status =
+                TestRMRestart.createNMContainerStatus(
+                  am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+            nm1.registerNode(Arrays.asList(status), null);
           }
         };
       }
@@ -1845,6 +1839,16 @@ public class TestRMRestart {
     }
   }
 
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+          Resource.newInstance(1024, 1), "recover container", 0,
+          Priority.newInstance(0), 0);
+    return containerReport;
+  }
+
   public class TestMemoryRMStateStore extends MemoryRMStateStore {
     int count = 0;
     public int updateApp = 0;
@@ -1852,7 +1856,7 @@ public class TestRMRestart {
 
     @Override
     public void updateApplicationStateInternal(ApplicationId appId,
-        ApplicationStateDataPBImpl appStateData) throws Exception {
+        ApplicationStateData appStateData) throws Exception {
       updateApp = ++count;
       super.updateApplicationStateInternal(appId, appStateData);
     }
@@ -1861,7 +1865,7 @@ public class TestRMRestart {
     public synchronized void
         updateApplicationAttemptStateInternal(
             ApplicationAttemptId attemptId,
-            ApplicationAttemptStateDataPBImpl attemptStateData)
+            ApplicationAttemptStateData attemptStateData)
             throws Exception {
       updateAttempt = ++count;
       super.updateApplicationAttemptStateInternal(attemptId,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,10 @@ import java.util.Collection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,8 +42,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -235,4 +240,75 @@ public class TestResourceManager {
     }
   }
 
+  @Test(timeout = 50000)
+  public void testFilterOverrides() throws Exception {
+    String filterInitializerConfKey = "hadoop.http.filter.initializers";
+    String[] filterInitializers =
+        {
+            AuthenticationFilterInitializer.class.getName(),
+            RMAuthenticationFilterInitializer.class.getName(),
+            AuthenticationFilterInitializer.class.getName() + ","
+                + RMAuthenticationFilterInitializer.class.getName(),
+            AuthenticationFilterInitializer.class.getName() + ", "
+                + RMAuthenticationFilterInitializer.class.getName(),
+            AuthenticationFilterInitializer.class.getName() + ", "
+                + this.getClass().getName() };
+    for (String filterInitializer : filterInitializers) {
+      resourceManager = new ResourceManager();
+      Configuration conf = new YarnConfiguration();
+      conf.set(filterInitializerConfKey, filterInitializer);
+      conf.set("hadoop.security.authentication", "kerberos");
+      conf.set("hadoop.http.authentication.type", "kerberos");
+      try {
+        try {
+          UserGroupInformation.setConfiguration(conf);
+        } catch (Exception e) {
+          // ignore we just care about getting true for
+          // isSecurityEnabled()
+          LOG.info("Got expected exception");
+        }
+        resourceManager.init(conf);
+        resourceManager.startWepApp();
+      } catch (RuntimeException e) {
+        // Exceptions are expected because we didn't setup everything
+        // just want to test filter settings
+        String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
+        if (filterInitializer.contains(this.getClass().getName())) {
+          Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+              + "," + this.getClass().getName(), tmp);
+        } else {
+          Assert.assertEquals(
+            RMAuthenticationFilterInitializer.class.getName(), tmp);
+        }
+        resourceManager.stop();
+      }
+    }
+
+    // simple mode overrides
+    String[] simpleFilterInitializers =
+        { "", StaticUserWebFilter.class.getName() };
+    for (String filterInitializer : simpleFilterInitializers) {
+      resourceManager = new ResourceManager();
+      Configuration conf = new YarnConfiguration();
+      conf.set(filterInitializerConfKey, filterInitializer);
+      try {
+        UserGroupInformation.setConfiguration(conf);
+        resourceManager.init(conf);
+        resourceManager.startWepApp();
+      } catch (RuntimeException e) {
+        // Exceptions are expected because we didn't setup everything
+        // just want to test filter settings
+        String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
+        if (filterInitializer.equals(StaticUserWebFilter.class.getName())) {
+          Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+              + "," + StaticUserWebFilter.class.getName(), tmp);
+        } else {
+          Assert.assertEquals(
+            RMAuthenticationFilterInitializer.class.getName(), tmp);
+        }
+        resourceManager.stop();
+      }
+    }
+  }
+
 }