You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by da...@apache.org on 2013/02/06 17:37:12 UTC

svn commit: r1443055 - in /hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ hadoop-yarn/hadoop-yarn-server/h...

Author: daryn
Date: Wed Feb  6 16:37:11 2013
New Revision: 1443055

URL: http://svn.apache.org/viewvc?rev=1443055&view=rev
Log:
svn merge -c 1443016 FIXES: YARN-357. App submission should not be synchronized (daryn)

Modified:
    hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

Modified: hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/CHANGES.txt?rev=1443055&r1=1443054&r2=1443055&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/CHANGES.txt Wed Feb  6 16:37:11 2013
@@ -270,6 +270,8 @@ Release 0.23.7 - UNRELEASED
 
   OPTIMIZATIONS
 
+    YARN-357. App submission should not be synchronized (daryn)
+
   BUG FIXES
 
     YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan 

Modified: hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1443055&r1=1443054&r2=1443055&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Feb  6 16:37:11 2013
@@ -228,7 +228,7 @@ public class RMAppManager implements Eve
   }
 
   @SuppressWarnings("unchecked")
-  protected synchronized void submitApplication(
+  protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime) {
     ApplicationId applicationId = submissionContext.getApplicationId();
     RMApp application = null;

Modified: hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1443055&r1=1443054&r2=1443055&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/branch-2.0.3-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Feb  6 16:37:11 2013
@@ -27,7 +27,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-
+import org.junit.Test;
 
 public class TestClientRMService {
 
@@ -235,6 +246,88 @@ public class TestClientRMService {
     rmService.renewDelegationToken(request);
   }
   
+  @Test(timeout=4000)
+  public void testConcurrentAppSubmit()
+      throws IOException, InterruptedException, BrokenBarrierException {
+    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    RMStateStore stateStore = mock(RMStateStore.class);
+    when(rmContext.getStateStore()).thenReturn(stateStore);
+    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+        null, mock(ApplicationACLsManager.class), new Configuration());
+
+    final ApplicationId appId1 = getApplicationId(100);
+    final ApplicationId appId2 = getApplicationId(101);
+    final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
+    final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
+    
+    final CyclicBarrier startBarrier = new CyclicBarrier(2);
+    final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+    @SuppressWarnings("rawtypes")
+    EventHandler eventHandler = new EventHandler() {
+      @Override
+      public void handle(Event rawEvent) {
+        if (rawEvent instanceof RMAppEvent) {
+          RMAppEvent event = (RMAppEvent) rawEvent;
+          if (event.getApplicationId().equals(appId1)) {
+            try {
+              startBarrier.await();
+              endBarrier.await();
+            } catch (BrokenBarrierException e) {
+              LOG.warn("Broken Barrier", e);
+            } catch (InterruptedException e) {
+              LOG.warn("Interrupted while awaiting barriers", e);
+            }
+          }
+        }
+      }
+    };
+
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
+      
+    final ClientRMService rmService =
+        new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+    // submit an app and wait for it to block while in app submission
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        try {
+          rmService.submitApplication(submitRequest1);
+        } catch (YarnRemoteException e) {}
+      }
+    };
+    t.start();
+    
+    // submit another app, so go through while the first app is blocked
+    startBarrier.await();
+    rmService.submitApplication(submitRequest2);
+    endBarrier.await();
+    t.join();
+  }
+ 
+  private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+    String user = MockApps.newUserName();
+    String queue = MockApps.newQueue();
+
+    ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
+    Resource resource = mock(Resource.class);
+    when(amContainerSpec.getResource()).thenReturn(resource);
+
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUser()).thenReturn(user);
+    when(submissionContext.getQueue()).thenReturn(queue);
+    when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
+    when(submissionContext.getApplicationId()).thenReturn(appId);
+    
+   SubmitApplicationRequest submitRequest =
+       recordFactory.newRecordInstance(SubmitApplicationRequest.class);
+   submitRequest.setApplicationSubmissionContext(submissionContext);
+   return submitRequest;
+  }
+
   private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
       throws IOException {
     Dispatcher dispatcher = mock(Dispatcher.class);