You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/03/28 00:15:54 UTC

svn commit: r1461892 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ hadoop-yarn/hadoop...

Author: tucu
Date: Wed Mar 27 23:15:54 2013
New Revision: 1461892

URL: http://svn.apache.org/r1461892
Log:
YARN-24. Nodemanager fails to start if log aggregation enabled and namenode unavailable. (sandyr via tucu)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1461892&r1=1461891&r2=1461892&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Mar 27 23:15:54 2013
@@ -97,6 +97,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-209. Fix CapacityScheduler to trigger application-activation when
     the cluster capacity changes. (Zhijie Shen via vinodkv)
 
+    YARN-24. Nodemanager fails to start if log aggregation enabled and 
+    namenode unavailable. (sandyr via tucu)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1461892&r1=1461891&r2=1461892&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Mar 27 23:15:54 2013
@@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -97,7 +96,7 @@ public class LogAggregationService exten
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
 
   private final ExecutorService threadPool;
-
+  
   public LogAggregationService(Dispatcher dispatcher, Context context,
       DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
     super(LogAggregationService.class.getName());
@@ -129,7 +128,6 @@ public class LogAggregationService exten
     // NodeId is only available during start, the following cannot be moved
     // anywhere else.
     this.nodeId = this.context.getNodeId();
-    verifyAndCreateRemoteLogDir(getConfig());
     super.start();
   }
   
@@ -164,7 +162,7 @@ public class LogAggregationService exten
     }
   }
   
-  private void verifyAndCreateRemoteLogDir(Configuration conf) {
+  void verifyAndCreateRemoteLogDir(Configuration conf) {
     // Checking the existance of the TLD
     FileSystem remoteFS = null;
     try {
@@ -177,7 +175,7 @@ public class LogAggregationService exten
       remoteExists = remoteFS.exists(this.remoteRootLogDir);
     } catch (IOException e) {
       throw new YarnException("Failed to check for existence of remoteLogDir ["
-          + this.remoteRootLogDir + "]");
+          + this.remoteRootLogDir + "]", e);
     }
     if (remoteExists) {
       try {
@@ -191,8 +189,8 @@ public class LogAggregationService exten
         }
       } catch (IOException e) {
         throw new YarnException(
-            "Failed while attempting to check permissions for dir ["
-                + this.remoteRootLogDir + "]");
+            "Failed to check permissions for dir ["
+                + this.remoteRootLogDir + "]", e);
       }
     } else {
       LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
@@ -208,7 +206,6 @@ public class LogAggregationService exten
             + this.remoteRootLogDir + "]", e);
       }
     }
-
   }
 
   Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
@@ -296,6 +293,7 @@ public class LogAggregationService exten
       Map<ApplicationAccessType, String> appAcls) {
     ApplicationEvent eventResponse;
     try {
+      verifyAndCreateRemoteLogDir(getConfig());
       initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1461892&r1=1461891&r2=1461892&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Mar 27 23:15:54 2013
@@ -44,6 +44,7 @@ import junit.framework.Assert;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -79,7 +80,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.util.Build
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mortbay.util.MultiException;
 
 
@@ -393,7 +394,76 @@ public class TestLogAggregationService e
   
   @Test
   @SuppressWarnings("unchecked")
-  public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+  public void testVerifyAndCreateRemoteDirsFailure()
+      throws Exception {
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+                                  super.dirsHandler));
+    logAggregationService.init(this.conf);
+    
+    YarnException e = new YarnException("KABOOM!");
+    doThrow(e)
+      .when(logAggregationService).verifyAndCreateRemoteLogDir(
+          any(Configuration.class));
+        
+    logAggregationService.start();
+    
+    // Now try to start an application
+    ApplicationId appId = BuilderUtils.newApplicationId(
+        System.currentTimeMillis(), (int)Math.random());
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
+        this.acls));
+    dispatcher.await();
+    
+    // Verify that it failed
+    ApplicationEvent[] expectedEvents = new ApplicationEvent[] {
+        new ApplicationEvent(appId, 
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
+    };
+    checkEvents(appEventHandler, expectedEvents, false,
+        "getType", "getApplicationID", "getDiagnostic");
+
+    Mockito.reset(logAggregationService);
+    
+    // Now try to start another one
+    ApplicationId appId2 = BuilderUtils.newApplicationId(
+        System.currentTimeMillis(), (int)Math.random());
+    File appLogDir =
+        new File(localLogDir, ConverterUtils.toString(appId2));
+    appLogDir.mkdir();
+    
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
+        this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
+        this.acls));
+    dispatcher.await();
+    
+    // Verify that it worked
+    expectedEvents = new ApplicationEvent[] {
+        new ApplicationEvent(appId, // original failure
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), 
+        new ApplicationEvent(appId2, // success
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
+    };
+    checkEvents(appEventHandler, expectedEvents, false,
+        "getType", "getApplicationID", "getDiagnostic");
+    
+    logAggregationService.stop();
+  }
+  
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
 
     this.conf.set(YarnConfiguration.NM_LOG_DIRS,
         localLogDir.getAbsolutePath());