You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/03/07 00:46:02 UTC

svn commit: r1297797 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ hadoop-yar...

Author: bobby
Date: Tue Mar  6 23:46:02 2012
New Revision: 1297797

URL: http://svn.apache.org/viewvc?rev=1297797&view=rev
Log:
svn merge -c 1297796 from trunk to branch-0.23 FIXES: MAPREDUCE-3977. LogAggregationService leaks log aggregator objects (Jason Lowe via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Mar  6 23:46:02 2012
@@ -203,6 +203,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth
     via bobby)
 
+    MAPREDUCE-3977. LogAggregationService leaks log aggregator objects
+    (Jason Lowe via bobby)
+
 Release 0.23.1 - 2012-02-17
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Mar  6 23:46:02 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -139,12 +140,6 @@ public class LogAggregationService exten
     super.stop();
   }
   
-  
-
-  
-
-
-  
   private void verifyAndCreateRemoteLogDir(Configuration conf) {
     // Checking the existance of the TLD
     FileSystem remoteFS = null;
@@ -289,7 +284,7 @@ public class LogAggregationService exten
     createAppDir(user, appId, userUgi);
 
     // New application
-    AppLogAggregator appLogAggregator =
+    final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
@@ -303,7 +298,22 @@ public class LogAggregationService exten
     // aggregation.
 
     // Schedule the aggregator.
-    this.threadPool.execute(appLogAggregator);
+    Runnable aggregatorWrapper = new Runnable() {
+      public void run() {
+        try {
+          appLogAggregator.run();
+        } finally {
+          appLogAggregators.remove(appId);
+        }
+      }
+    };
+    this.threadPool.execute(aggregatorWrapper);
+  }
+
+  // for testing only
+  @Private
+  int getNumAggregators() {
+    return this.appLogAggregators.size();
   }
 
   private void stopContainer(ContainerId containerId, int exitCode) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue Mar  6 23:46:02 2012
@@ -565,4 +565,38 @@ public class TestLogAggregationService e
 
     logAggregationService.stop();
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLogAggregatorCleanup() throws Exception {
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    // get the AppLogAggregationImpl thread to crash
+    LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, delSrvc,
+                                  mockedDirSvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    logAggregationService.handle(new LogHandlerAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
+    dispatcher.await();
+    int timeToWait = 20 * 1000;
+    while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) {
+      Thread.sleep(100);
+      timeToWait -= 100;
+    }
+    Assert.assertEquals("Log aggregator failed to cleanup!", 0,
+        logAggregationService.getNumAggregators());
+  }
 }