You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by da...@apache.org on 2013/02/06 16:36:56 UTC
svn commit: r1443021 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn...
Author: daryn
Date: Wed Feb 6 15:36:55 2013
New Revision: 1443021
URL: http://svn.apache.org/viewvc?rev=1443021&view=rev
Log:
svn merge -c 1443016 FIXES: YARN-357. App submission should not be synchronized (daryn)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1443021&r1=1443020&r2=1443021&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 6 15:36:55 2013
@@ -282,6 +282,8 @@ Release 0.23.7 - UNRELEASED
OPTIMIZATIONS
+ YARN-357. App submission should not be synchronized (daryn)
+
BUG FIXES
YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1443021&r1=1443020&r2=1443021&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Feb 6 15:36:55 2013
@@ -228,7 +228,7 @@ public class RMAppManager implements Eve
}
@SuppressWarnings("unchecked")
- protected synchronized void submitApplication(
+ protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1443021&r1=1443020&r2=1443021&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Feb 6 15:36:55 2013
@@ -27,7 +27,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-
+import org.junit.Test;
public class TestClientRMService {
@@ -235,6 +246,88 @@ public class TestClientRMService {
rmService.renewDelegationToken(request);
}
+ @Test(timeout=4000)
+ public void testConcurrentAppSubmit()
+ throws IOException, InterruptedException, BrokenBarrierException {
+ YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+
+ final ApplicationId appId1 = getApplicationId(100);
+ final ApplicationId appId2 = getApplicationId(101);
+ final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
+ final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
+
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
+ final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+ @SuppressWarnings("rawtypes")
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void handle(Event rawEvent) {
+ if (rawEvent instanceof RMAppEvent) {
+ RMAppEvent event = (RMAppEvent) rawEvent;
+ if (event.getApplicationId().equals(appId1)) {
+ try {
+ startBarrier.await();
+ endBarrier.await();
+ } catch (BrokenBarrierException e) {
+ LOG.warn("Broken Barrier", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while awaiting barriers", e);
+ }
+ }
+ }
+ }
+ };
+
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
+
+ final ClientRMService rmService =
+ new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+ // submit an app and wait for it to block while in app submission
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnRemoteException e) {}
+ }
+ };
+ t.start();
+
+ // submit another app, so go through while the first app is blocked
+ startBarrier.await();
+ rmService.submitApplication(submitRequest2);
+ endBarrier.await();
+ t.join();
+ }
+
+ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+ String user = MockApps.newUserName();
+ String queue = MockApps.newQueue();
+
+ ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
+ Resource resource = mock(Resource.class);
+ when(amContainerSpec.getResource()).thenReturn(resource);
+
+ ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+ when(submissionContext.getUser()).thenReturn(user);
+ when(submissionContext.getQueue()).thenReturn(queue);
+ when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
+ when(submissionContext.getApplicationId()).thenReturn(appId);
+
+ SubmitApplicationRequest submitRequest =
+ recordFactory.newRecordInstance(SubmitApplicationRequest.class);
+ submitRequest.setApplicationSubmissionContext(submissionContext);
+ return submitRequest;
+ }
+
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
throws IOException {
Dispatcher dispatcher = mock(Dispatcher.class);