You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/11 19:20:36 UTC

[15/42] hadoop git commit: YARN-4096. App local logs are leaked if log aggregation fails to initialize for the app. Contributed by Jason Lowe.

YARN-4096. App local logs are leaked if log aggregation fails to initialize for the app. Contributed by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16b9037d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16b9037d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16b9037d

Branch: refs/heads/YARN-1197
Commit: 16b9037dc1300b8bdbe54ba7cd47c53fe16e93d8
Parents: 970daaa
Author: Zhihai Xu <zx...@apache.org>
Authored: Tue Sep 8 12:29:54 2015 -0700
Committer: Zhihai Xu <zx...@apache.org>
Committed: Tue Sep 8 12:29:54 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                    |  3 +++
 .../logaggregation/AppLogAggregator.java           |  2 ++
 .../logaggregation/AppLogAggregatorImpl.java       |  5 +++++
 .../logaggregation/LogAggregationService.java      | 14 +++++++++-----
 .../logaggregation/TestLogAggregationService.java  | 17 ++++++++++++-----
 5 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/16b9037d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6d3796a..7308075 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -898,6 +898,9 @@ Release 2.7.2 - UNRELEASED
     YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when
     state-store error occurs. (Jian He via xgong)
 
+    YARN-4096. App local logs are leaked if log aggregation fails to initialize
+    for the app. (Jason Lowe via zxu)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16b9037d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
index 83c5d5a..0178699 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
@@ -27,4 +27,6 @@ public interface AppLogAggregator extends Runnable {
   void abortLogAggregation();
 
   void finishLogAggregation();
+
+  void disableLogAggregation();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16b9037d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 742b8a9..b2342c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -596,6 +596,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.notifyAll();
   }
 
+  @Override
+  public void disableLogAggregation() {
+    this.logAggregationDisabled = true;
+  }
+
   @Private
   @VisibleForTesting
   // This is only used for testing.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16b9037d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 259e9ae..6a6f101 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -363,19 +363,19 @@ public class LogAggregationService extends AbstractService implements
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }
     // wait until check for existing aggregator to create dirs
+    YarnRuntimeException appDirException = null;
     try {
       // Create the app dir
       createAppDir(user, appId, userUgi);
     } catch (Exception e) {
-      appLogAggregators.remove(appId);
-      closeFileSystems(userUgi);
+      appLogAggregator.disableLogAggregation();
       if (!(e instanceof YarnRuntimeException)) {
-        e = new YarnRuntimeException(e);
+        appDirException = new YarnRuntimeException(e);
+      } else {
+        appDirException = (YarnRuntimeException)e;
       }
-      throw (YarnRuntimeException)e;
     }
 
-
     // TODO Get the user configuration for the list of containers that need log
     // aggregation.
 
@@ -391,6 +391,10 @@ public class LogAggregationService extends AbstractService implements
       }
     };
     this.threadPool.execute(aggregatorWrapper);
+
+    if (appDirException != null) {
+      throw appDirException;
+    }
   }
 
   protected void closeFileSystems(final UserGroupInformation userUgi) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16b9037d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 77d75ca..77c6e3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -731,9 +731,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-        
+
+    DeletionService spyDelSrvc = spy(this.delSrvc);
     LogAggregationService logAggregationService = spy(
-        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+        new LogAggregationService(dispatcher, this.context, spyDelSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
@@ -741,6 +742,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
+
+    File appLogDir =
+        new File(localLogDir, ConverterUtils.toString(appId));
+    appLogDir.mkdir();
+
     Exception e = new RuntimeException("KABOOM!");
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
@@ -759,9 +765,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
-    // filesystems may have been instantiated
-    verify(logAggregationService).closeFileSystems(
-        any(UserGroupInformation.class));
 
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
@@ -774,6 +777,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
+    verify(spyDelSrvc).delete(eq(user), any(Path.class),
+        Mockito.<Path>anyVararg());
+    verify(logAggregationService).closeFileSystems(
+        any(UserGroupInformation.class));
   }
 
   private void writeContainerLogs(File appLogDir, ContainerId containerId,