You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/03/07 00:46:02 UTC
svn commit: r1297797 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/
hadoop-yar...
Author: bobby
Date: Tue Mar 6 23:46:02 2012
New Revision: 1297797
URL: http://svn.apache.org/viewvc?rev=1297797&view=rev
Log:
svn merge -c 1297796 from trunk to branch-0.23 FIXES: MAPREDUCE-3977. LogAggregationService leaks log aggregator objects (Jason Lowe via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Mar 6 23:46:02 2012
@@ -203,6 +203,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth
via bobby)
+ MAPREDUCE-3977. LogAggregationService leaks log aggregator objects
+ (Jason Lowe via bobby)
+
Release 0.23.1 - 2012-02-17
NEW FEATURES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Mar 6 23:46:02 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -139,12 +140,6 @@ public class LogAggregationService exten
super.stop();
}
-
-
-
-
-
-
private void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD
FileSystem remoteFS = null;
@@ -289,7 +284,7 @@ public class LogAggregationService exten
createAppDir(user, appId, userUgi);
// New application
- AppLogAggregator appLogAggregator =
+ final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
@@ -303,7 +298,22 @@ public class LogAggregationService exten
// aggregation.
// Schedule the aggregator.
- this.threadPool.execute(appLogAggregator);
+ Runnable aggregatorWrapper = new Runnable() {
+ public void run() {
+ try {
+ appLogAggregator.run();
+ } finally {
+ appLogAggregators.remove(appId);
+ }
+ }
+ };
+ this.threadPool.execute(aggregatorWrapper);
+ }
+
+ // for testing only
+ @Private
+ int getNumAggregators() {
+ return this.appLogAggregators.size();
}
private void stopContainer(ContainerId containerId, int exitCode) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1297797&r1=1297796&r2=1297797&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue Mar 6 23:46:02 2012
@@ -565,4 +565,38 @@ public class TestLogAggregationService e
logAggregationService.stop();
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLogAggregatorCleanup() throws Exception {
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ // get the AppLogAggregationImpl thread to crash
+ LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc,
+ mockedDirSvc);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+ logAggregationService.handle(new LogHandlerAppStartedEvent(
+ application1, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
+ dispatcher.await();
+ int timeToWait = 20 * 1000;
+ while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) {
+ Thread.sleep(100);
+ timeToWait -= 100;
+ }
+ Assert.assertEquals("Log aggregator failed to cleanup!", 0,
+ logAggregationService.getNumAggregators());
+ }
}