You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "NING DING (JIRA)" <ji...@apache.org> on 2015/11/30 05:04:10 UTC
[jira] [Commented] (YARN-4398) Yarn recover functionality causes
the cluster running slowly and the cluster usage rate is far below 100
[ https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15031300#comment-15031300 ]
NING DING commented on YARN-4398:
---------------------------------
[~jianhe] would you kindly help to take a look on this issue?
> Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100
> --------------------------------------------------------------------------------------------------------
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
> Issue Type: Bug
> Components: resourcemanager
> Affects Versions: 2.7.1
> Reporter: NING DING
> Attachments: YARN-4398.2.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is just 50 even there are many pending Apps.
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling storeNewApplication method defined in RMStateStore. This storeNewApplication method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
> private static final class RMAppNewlySavingTransition extends RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
> // If recovery is enabled then store the application information in a
> // non-blocking call so make sure that RM has stored the information
> // needed to restart the AM after RM restart without further client
> // communication
> LOG.info("Storing application with id " + app.applicationId);
> app.rmContext.getStateStore().storeNewApplication(app);
> }
> }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
> }
> {code}
> In thread B, the FileSystemRMStateStore is calling storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
> ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
> // currently throw all exceptions. May need to respond differently for HA
> // based on whether we have lost the right to write to FS
> writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
> LOG.info("Error storing info for app: " + appId, e);
> throw e;
> }
> }
> {code}
> Think thread B firstly comes into FileSystemRMStateStore.storeApplicationStateInternal method, then thread A will be blocked for a while because of synchronization. In ResourceManager there is only one RMStateStore instance. In my cluster it's FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack shows it's called form AsyncDispatcher.dispatch method. This method code is as below.
> {code:title=AsyncDispatcher.java|borderStyle=solid}
> protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
> LOG.debug("Dispatching the event " + event.getClass().getName() + "."
> + event.toString());
> }
> Class<? extends Enum> type = event.getType().getDeclaringClass();
> try{
> EventHandler handler = eventDispatchers.get(type);
> if(handler != null) {
> handler.handle(event);
> } else {
> throw new Exception("No handler for registered for " + type);
> }
> } catch (Throwable t) {
> //TODO Maybe log the state of the queue
> LOG.fatal("Error in dispatcher thread", t);
> // If serviceStop is called, we should exit this thread gracefully.
> if (exitOnDispatchException
> && (ShutdownHookManager.get().isShutdownInProgress()) == false
> && stopped == false) {
> Thread shutDownThread = new Thread(createShutDownThread());
> shutDownThread.setName("AsyncDispatcher ShutDown handler");
> shutDownThread.start();
> }
> }
> }
> {code}
> Above code shows AsyncDispatcher.dispatch method can process different type events.
> In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher created in ResourceManager.serviceInit method.
> You can find many eventTypes and handlers are registered in ResourceManager.rmDispatcher.
> In above scenario thread B blocks thread A, then many following events processing are blocked.
> In my testing cluster, there is only one queue and the client submits 1000 applications concurrently, the yarn cluster usage rate is 50. Many apps are pending. If I disable resourceManager recover functionality, the cluster usage can be 100.
> To solve this issue, I removed synchronized modifier on some methods defined in RMStateStore.
> Instead, in these methods I defined a dedicated lock object before calling dispatcher.getEventHandler().handle.
> In this way, the yarn cluster usage rate can be 100 and the whole cluster is good running.
> Please see my attached patch.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)