You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by at...@apache.org on 2013/02/16 02:12:14 UTC

svn commit: r1446832 [2/3] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop...

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java Sat Feb 16 01:12:07 2013
@@ -48,6 +48,7 @@ public class CapacitySchedulerQueueInfo 
   protected String queueName;
   protected QueueState state;
   protected CapacitySchedulerQueueInfoList queues;
+  protected ResourceInfo resourcesUsed;
 
   CapacitySchedulerQueueInfo() {
   };
@@ -69,6 +70,7 @@ public class CapacitySchedulerQueueInfo 
     usedResources = q.getUsedResources().toString();
     queueName = q.getQueueName();
     state = q.getState();
+    resourcesUsed = new ResourceInfo(q.getUsedResources());
   }
 
   public float getCapacity() {
@@ -119,6 +121,10 @@ public class CapacitySchedulerQueueInfo 
     return this.queues;
   }
 
+  public ResourceInfo getResourcesUsed() {
+    return resourcesUsed;
+  }
+
   /**
    * Limit a value to a specified range.
    * @param val the value to be capped

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Sat Feb 16 01:12:07 2013
@@ -18,16 +18,19 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.security.PrivilegedAction;
 import java.util.Map;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -118,21 +122,27 @@ public class MockRM extends ResourceMana
   }
 
   public RMApp submitApp(int masterMemory) throws Exception {
-    return submitApp(masterMemory, "", "");
+    return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+      .getShortUserName());
   }
 
   // client
   public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
-    return submitApp(masterMemory, name, user, null, false);
+    return submitApp(masterMemory, name, user, null, false, null);
   }
   
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls) throws Exception {
-    return submitApp(masterMemory, name, user, acls, false);
+    return submitApp(masterMemory, name, user, acls, false, null);
   }  
 
   public RMApp submitApp(int masterMemory, String name, String user,
-      Map<ApplicationAccessType, String> acls, boolean unmanaged) throws Exception {
+      Map<ApplicationAccessType, String> acls, String queue) throws Exception {
+    return submitApp(masterMemory, name, user, acls, false, queue);
+  }  
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception {
     ClientRMProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -148,6 +158,9 @@ public class MockRM extends ResourceMana
     if(unmanaged) {
       sub.setUnmanagedAM(true);
     }
+    if (queue != null) {
+      sub.setQueue(queue);
+    }
     ContainerLaunchContext clc = Records
         .newRecord(ContainerLaunchContext.class);
     Resource capability = Records.newRecord(Resource.class);
@@ -157,7 +170,29 @@ public class MockRM extends ResourceMana
     sub.setAMContainerSpec(clc);
     req.setApplicationSubmissionContext(sub);
 
-    client.submitApplication(req);
+    UserGroupInformation fakeUser =
+      UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
+    PrivilegedAction<SubmitApplicationResponse> action =
+      new PrivilegedAction<SubmitApplicationResponse>() {
+      ClientRMProtocol client;
+      SubmitApplicationRequest req;
+      @Override
+      public SubmitApplicationResponse run() {
+        try {
+          return client.submitApplication(req);
+        } catch (YarnRemoteException e) {
+          e.printStackTrace();
+        }
+        return null;
+      }
+      PrivilegedAction<SubmitApplicationResponse> setClientReq(
+        ClientRMProtocol client, SubmitApplicationRequest req) {
+        this.client = client;
+        this.req = req;
+        return this;
+      }
+    }.setClientReq(client, req);
+    fakeUser.doAs(action);
     // make sure app is immediately available after submit
     waitForState(appId, RMAppState.ACCEPTED);
     return getRMContext().getRMApps().get(appId);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Sat Feb 16 01:12:07 2013
@@ -27,7 +27,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-
+import org.junit.Test;
 
 public class TestClientRMService {
 
@@ -235,6 +246,88 @@ public class TestClientRMService {
     rmService.renewDelegationToken(request);
   }
   
+  @Test(timeout=4000)
+  public void testConcurrentAppSubmit()
+      throws IOException, InterruptedException, BrokenBarrierException {
+    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    RMStateStore stateStore = mock(RMStateStore.class);
+    when(rmContext.getStateStore()).thenReturn(stateStore);
+    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+        null, mock(ApplicationACLsManager.class), new Configuration());
+
+    final ApplicationId appId1 = getApplicationId(100);
+    final ApplicationId appId2 = getApplicationId(101);
+    final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
+    final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
+    
+    final CyclicBarrier startBarrier = new CyclicBarrier(2);
+    final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+    @SuppressWarnings("rawtypes")
+    EventHandler eventHandler = new EventHandler() {
+      @Override
+      public void handle(Event rawEvent) {
+        if (rawEvent instanceof RMAppEvent) {
+          RMAppEvent event = (RMAppEvent) rawEvent;
+          if (event.getApplicationId().equals(appId1)) {
+            try {
+              startBarrier.await();
+              endBarrier.await();
+            } catch (BrokenBarrierException e) {
+              LOG.warn("Broken Barrier", e);
+            } catch (InterruptedException e) {
+              LOG.warn("Interrupted while awaiting barriers", e);
+            }
+          }
+        }
+      }
+    };
+
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
+      
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+    // submit an app and wait for it to block while in app submission
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        try {
+          rmService.submitApplication(submitRequest1);
+        } catch (YarnRemoteException e) {}
+      }
+    };
+    t.start();
+    
+    // submit another app, so go through while the first app is blocked
+    startBarrier.await();
+    rmService.submitApplication(submitRequest2);
+    endBarrier.await();
+    t.join();
+  }
+ 
+  private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+    String user = MockApps.newUserName();
+    String queue = MockApps.newQueue();
+
+    ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
+    Resource resource = mock(Resource.class);
+    when(amContainerSpec.getResource()).thenReturn(resource);
+
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUser()).thenReturn(user);
+    when(submissionContext.getQueue()).thenReturn(queue);
+    when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
+    when(submissionContext.getApplicationId()).thenReturn(appId);
+    
+   SubmitApplicationRequest submitRequest =
+       recordFactory.newRecordInstance(SubmitApplicationRequest.class);
+   submitRequest.setApplicationSubmissionContext(submissionContext);
+   return submitRequest;
+  }
+
   private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
       throws IOException {
     Dispatcher dispatcher = mock(Dispatcher.class);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Sat Feb 16 01:12:07 2013
@@ -17,13 +17,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
 import org.junit.Test;
 
 
@@ -59,6 +66,10 @@ public class TestClientRMTokens {
 
   private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
   
+  @Before
+  public void resetSecretManager() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+  }
   
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
@@ -200,7 +211,122 @@ public class TestClientRMTokens {
         RPC.stopProxy(clientRMWithDT);
       }
     }
+  }
+  
+  @Test
+  public void testShortCircuitRenewCancel()
+      throws IOException, InterruptedException {
+    InetSocketAddress addr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(addr, addr, true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelWildcardAddress()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr = new InetSocketAddress(123);
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelSameHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostSamePort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+        false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+                                            InetSocketAddress serviceAddr,
+                                            boolean shouldShortCircuit
+      ) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+        YarnBadRPC.class, YarnRPC.class);
     
+    RMDelegationTokenSecretManager secretManager =
+        mock(RMDelegationTokenSecretManager.class);
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+    RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), null);
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+    SecurityUtil.setTokenService(token, serviceAddr);
+    if (shouldShortCircuit) {
+      token.renew(conf);
+      verify(secretManager).renewToken(eq(token), eq("renewer"));
+      reset(secretManager);
+      token.cancel(conf);
+      verify(secretManager).cancelToken(eq(token), eq("renewer"));
+    } else {      
+      try { 
+        token.renew(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).renewToken(any(Token.class), anyString());
+      try { 
+        token.cancel(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static class YarnBadRPC extends YarnRPC {
+    @Override
+    public Object getProxy(Class protocol, InetSocketAddress addr,
+        Configuration conf) {
+      throw new RuntimeException("getProxy");
+    }
+
+    @Override
+    public void stopProxy(Object proxy, Configuration conf) {
+      throw new RuntimeException("stopProxy");
+    }
+
+    @Override
+    public Server getServer(Class protocol, Object instance,
+        InetSocketAddress addr, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        int numHandlers, String portRangeConfig) {
+      throw new RuntimeException("getServer");
+    }
   }
   
   // Get the delegation token directly as it is a little difficult to setup

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Sat Feb 16 01:12:07 2013
@@ -35,8 +35,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sat Feb 16 01:12:07 2013
@@ -152,7 +152,7 @@ public class TestRMRestart {
         .getApplicationId());
     
     // create unmanaged app
-    RMApp appUnmanaged = rm1.submitApp(200, "", "", null, true);
+    RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null);
     ApplicationAttemptId unmanagedAttemptId = 
                         appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
     // assert appUnmanaged info is saved

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Sat Feb 16 01:12:07 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Sat Feb 16 01:12:07 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Sat Feb 16 01:12:07 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Sat Feb 16 01:12:07 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -42,23 +43,35 @@ public class TestSchedulerUtils {
 
     // case negative memory
     ask.setCapability(Resources.createResource(-1024));
+    Resource before = ask.getCapability();
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+    Resource after = ask.getCapability();
     assertEquals(minMemory, ask.getCapability().getMemory());
+    assertTrue(before == after);
 
     // case zero memory
     ask.setCapability(Resources.createResource(0));
+    before = ask.getCapability();
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+    after = ask.getCapability();
     assertEquals(minMemory, ask.getCapability().getMemory());
+    assertTrue(before == after);
 
     // case memory is a multiple of minMemory
     ask.setCapability(Resources.createResource(2 * minMemory));
+    before = ask.getCapability();
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+    after = ask.getCapability();
     assertEquals(2 * minMemory, ask.getCapability().getMemory());
+    assertTrue(before == after);
 
     // case memory is not a multiple of minMemory
     ask.setCapability(Resources.createResource(minMemory + 10));
+    before = ask.getCapability();
     SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
+    after = ask.getCapability();
     assertEquals(2 * minMemory, ask.getCapability().getMemory());
+    assertTrue(before == after);
 
   }
   
@@ -73,24 +86,33 @@ public class TestSchedulerUtils {
 
     // case negative memory/vcores
     ask.setCapability(Resources.createResource(-1024, -1));
+    Resource before = ask.getCapability();
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource);
+    Resource after = ask.getCapability();
     assertEquals(minResource, ask.getCapability());
+    assertTrue(before == after);
 
     // case zero memory/vcores
     ask.setCapability(Resources.createResource(0, 0));
+    before = ask.getCapability();
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource);
+    after = ask.getCapability();
     assertEquals(minResource, ask.getCapability());
     assertEquals(1, ask.getCapability().getVirtualCores());
     assertEquals(1024, ask.getCapability().getMemory());
+    assertTrue(before == after);
 
     // case non-zero memory & zero cores
     ask.setCapability(Resources.createResource(1536, 0));
+    before = ask.getCapability();
     SchedulerUtils.normalizeRequest(
         ask, resourceCalculator, clusterResource, minResource);
+    after = ask.getCapability();
     assertEquals(Resources.createResource(2048, 1), ask.getCapability());
     assertEquals(1, ask.getCapability().getVirtualCores());
     assertEquals(2048, ask.getCapability().getMemory());
+    assertTrue(before == after);
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sat Feb 16 01:12:07 2013
@@ -33,13 +33,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Sat Feb 16 01:12:07 2013
@@ -21,11 +21,17 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +56,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * unit test - 
@@ -541,4 +549,54 @@ public class TestDelegationTokenRenewer 
       fail("Renewal of cancelled token should have failed");
     } catch (InvalidToken ite) {}
   }
+  
+  @Test(timeout=2000)
+  public void testConncurrentAddApplication()
+      throws IOException, InterruptedException, BrokenBarrierException {
+    final CyclicBarrier startBarrier = new CyclicBarrier(2);
+    final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+    // this token uses barriers to block during renew
+    final Credentials creds1 = new Credentials();
+    final Token<?> token1 = mock(Token.class);
+    creds1.addToken(new Text("token"), token1);
+    doReturn(true).when(token1).isManaged();
+    doAnswer(new Answer<Long>() {
+      public Long answer(InvocationOnMock invocation)
+          throws InterruptedException, BrokenBarrierException {
+        startBarrier.await();
+        endBarrier.await();
+        return Long.MAX_VALUE;
+      }}).when(token1).renew(any(Configuration.class));
+
+    // this dummy token fakes renewing
+    final Credentials creds2 = new Credentials();
+    final Token<?> token2 = mock(Token.class);
+    creds2.addToken(new Text("token"), token2);
+    doReturn(true).when(token2).isManaged();
+    doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
+
+    // fire up the renewer
+    final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
+    dtr.init(conf);
+    dtr.start();
+    
+    // submit a job that blocks during renewal
+    Thread submitThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          dtr.addApplication(mock(ApplicationId.class), creds1, false);
+        } catch (IOException e) {}        
+      }
+    };
+    submitThread.start();
+    
+    // wait till 1st submit blocks, then submit another
+    startBarrier.await();
+    dtr.addApplication(mock(ApplicationId.class), creds2, false);
+    // signal 1st to complete
+    endBarrier.await();
+    submitThread.join();
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java Sat Feb 16 01:12:07 2013
@@ -27,10 +27,12 @@ import javax.ws.rs.core.MediaType;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -44,6 +46,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.InputSource;
 
@@ -355,10 +358,10 @@ public class TestRMWebServicesCapacitySc
   private void verifySubQueue(JSONObject info, String q, 
       float parentAbsCapacity, float parentAbsMaxCapacity)
       throws JSONException, Exception {
-    int numExpectedElements = 11;
+    int numExpectedElements = 12;
     boolean isParentQueue = true;
     if (!info.has("queues")) {
-      numExpectedElements = 20;
+      numExpectedElements = 22;
       isParentQueue = false;
     }
     assertEquals("incorrect number of elements", numExpectedElements, info.length());
@@ -397,6 +400,8 @@ public class TestRMWebServicesCapacitySc
       lqi.userLimit = info.getInt("userLimit");
       lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
       verifyLeafQueueGeneric(q, lqi);
+      // resourcesUsed and users (per-user resources used) are checked in
+      // testPerUserResource()
     }
   }
 
@@ -464,4 +469,143 @@ public class TestRMWebServicesCapacitySc
     assertEquals("userLimitFactor doesn't match",
         csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);
   }
+
+  //Return a child Node of node with the tagname or null if none exists 
+  private Node getChildNodeByName(Node node, String tagname) {
+    NodeList nodeList = node.getChildNodes();
+    for (int i=0; i < nodeList.getLength(); ++i) {
+      if (nodeList.item(i).getNodeName().equals(tagname)) {
+        return nodeList.item(i);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Test per user resources and resourcesUsed elements in the web services XML
+   * @throws Exception
+   */
+  @Test
+  public void testPerUserResourcesXML() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+    try {
+      rm.submitApp(10, "app1", "user1", null, "b1");
+      rm.submitApp(20, "app2", "user2", null, "b1");
+
+      //Get the XML from ws/v1/cluster/scheduler
+      WebResource r = resource();
+      ClientResponse response = r.path("ws/v1/cluster/scheduler")
+        .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
+      String xml = response.getEntity(String.class);
+      DocumentBuilder db = DocumentBuilderFactory.newInstance()
+        .newDocumentBuilder();
+      InputSource is = new InputSource();
+      is.setCharacterStream(new StringReader(xml));
+      //Parse the XML we got
+      Document dom = db.parse(is);
+
+      //Get all users elements (1 for each leaf queue)
+      NodeList allUsers = dom.getElementsByTagName("users");
+      for (int i=0; i<allUsers.getLength(); ++i) {
+        Node perUserResources = allUsers.item(i);
+        String queueName = getChildNodeByName(perUserResources
+          .getParentNode(), "queueName").getTextContent();
+        if (queueName.equals("b1")) {
+          //b1 should have two users (user1 and user2) which submitted jobs
+          assertEquals(2, perUserResources.getChildNodes().getLength());
+          NodeList users = perUserResources.getChildNodes();
+          for (int j=0; j<users.getLength(); ++j) {
+            Node user = users.item(j);
+            String username = getChildNodeByName(user, "username")
+              .getTextContent(); 
+            assertTrue(username.equals("user1") || username.equals("user2"));
+            //Should be a parsable integer
+            Integer.parseInt(getChildNodeByName(getChildNodeByName(user,
+              "resourcesUsed"), "memory").getTextContent());
+            Integer.parseInt(getChildNodeByName(user, "numActiveApplications")
+              .getTextContent());
+            Integer.parseInt(getChildNodeByName(user, "numPendingApplications")
+                .getTextContent());
+          }
+        } else {
+        //Queues other than b1 should have 0 users
+          assertEquals(0, perUserResources.getChildNodes().getLength());
+        }
+      }
+      NodeList allResourcesUsed = dom.getElementsByTagName("resourcesUsed");
+      for (int i=0; i<allResourcesUsed.getLength(); ++i) {
+        Node resourcesUsed = allResourcesUsed.item(i);
+        Integer.parseInt(getChildNodeByName(resourcesUsed, "memory")
+            .getTextContent());
+        Integer.parseInt(getChildNodeByName(resourcesUsed, "vCores")
+              .getTextContent());
+      }
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private void checkResourcesUsed(JSONObject queue) throws JSONException {
+    queue.getJSONObject("resourcesUsed").getInt("memory");
+    queue.getJSONObject("resourcesUsed").getInt("vCores");
+  }
+
+  //Also checks resourcesUsed
+  private JSONObject getSubQueue(JSONObject queue, String subQueue)
+    throws JSONException {
+    JSONArray queues = queue.getJSONObject("queues").getJSONArray("queue");
+    for (int i=0; i<queues.length(); ++i) {
+      checkResourcesUsed(queues.getJSONObject(i));
+      if (queues.getJSONObject(i).getString("queueName").equals(subQueue) ) {
+        return queues.getJSONObject(i);
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testPerUserResourcesJSON() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+    try {
+      rm.submitApp(10, "app1", "user1", null, "b1");
+      rm.submitApp(20, "app2", "user2", null, "b1");
+
+      //Get JSON
+      WebResource r = resource();
+      ClientResponse response = r.path("ws").path("v1").path("cluster")
+          .path("scheduler/").accept(MediaType.APPLICATION_JSON)
+          .get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      JSONObject schedulerInfo = json.getJSONObject("scheduler").getJSONObject(
+        "schedulerInfo");
+      JSONObject b1 = getSubQueue(getSubQueue(schedulerInfo, "b"), "b1");
+      //Check users user1 and user2 exist in b1
+      JSONArray users = b1.getJSONObject("users").getJSONArray("user");
+      for (int i=0; i<2; ++i) {
+        JSONObject user = users.getJSONObject(i);
+        assertTrue("User isn't user1 or user2",user.getString("username")
+          .equals("user1") || user.getString("username").equals("user2"));
+        user.getInt("numActiveApplications");
+        user.getInt("numPendingApplications");
+        checkResourcesUsed(user);
+      }
+    } finally {
+      rm.stop();
+    }
+  }
+
+
+  @Test
+  public void testResourceInfo() {
+    Resource res = Resources.createResource(10, 1);
+    // If we add a new resource (e.g disks), then
+    // CapacitySchedulerPage and these RM WebServices + docs need to be updated
+    // eg. ResourceInfo
+    assertEquals("<memory:10, vCores:1>", res.toString());
+  }
 }