You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/07 00:53:52 UTC

[07/50] [abbrv] hadoop git commit: YARN-7495. Improve robustness of the AggregatedLogDeletionService. Contributed by Jonathan Eagles

YARN-7495. Improve robustness of the AggregatedLogDeletionService. Contributed by Jonathan Eagles


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cfaee2e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cfaee2e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cfaee2e

Branch: refs/heads/HDFS-7240
Commit: 5cfaee2e6db8b2ac55708de0968ff5539ee3bd76
Parents: 75a3ab8
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 30 12:39:18 2017 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 30 12:39:18 2017 -0600

----------------------------------------------------------------------
 .../AggregatedLogDeletionService.java           | 90 ++++++++++++--------
 .../TestAggregatedLogDeletionService.java       | 68 +++++++++++++++
 2 files changed, 122 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfaee2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index a80f9d7..562bd2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -85,49 +85,67 @@ public class AggregatedLogDeletionService extends AbstractService {
             deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
           }
         }
-      } catch (IOException e) {
-        logIOException("Error reading root log dir this deletion " +
-        		"attempt is being aborted", e);
+      } catch (Throwable t) {
+        logException("Error reading root log dir this deletion " +
+            "attempt is being aborted", t);
       }
       LOG.info("aggregated log deletion finished.");
     }
     
     private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, 
         FileSystem fs, ApplicationClientProtocol rmClient) {
+      FileStatus[] appDirs;
       try {
-        for(FileStatus appDir : fs.listStatus(dir)) {
-          if(appDir.isDirectory() && 
-              appDir.getModificationTime() < cutoffMillis) {
-            boolean appTerminated =
-                isApplicationTerminated(ApplicationId.fromString(appDir
-                  .getPath().getName()), rmClient);
-            if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
-              try {
-                LOG.info("Deleting aggregated logs in "+appDir.getPath());
-                fs.delete(appDir.getPath(), true);
-              } catch (IOException e) {
-                logIOException("Could not delete "+appDir.getPath(), e);
-              }
-            } else if (!appTerminated){
-              try {
-                for(FileStatus node: fs.listStatus(appDir.getPath())) {
-                  if(node.getModificationTime() < cutoffMillis) {
-                    try {
-                      fs.delete(node.getPath(), true);
-                    } catch (IOException ex) {
-                      logIOException("Could not delete "+appDir.getPath(), ex);
-                    }
-                  }
+        appDirs = fs.listStatus(dir);
+      } catch (IOException e) {
+        logException("Could not read the contents of " + dir, e);
+        return;
+      }
+      for (FileStatus appDir : appDirs) {
+        deleteAppDirLogs(cutoffMillis, fs, rmClient, appDir);
+      }
+    }
+
+    private static void deleteAppDirLogs(long cutoffMillis, FileSystem fs,
+                                         ApplicationClientProtocol rmClient,
+                                         FileStatus appDir) {
+      try {
+        if (appDir.isDirectory() &&
+            appDir.getModificationTime() < cutoffMillis) {
+          ApplicationId appId = ApplicationId.fromString(
+              appDir.getPath().getName());
+          boolean appTerminated = isApplicationTerminated(appId, rmClient);
+          if (!appTerminated) {
+            // Application is still running
+            FileStatus[] logFiles;
+            try {
+              logFiles = fs.listStatus(appDir.getPath());
+            } catch (IOException e) {
+              logException("Error reading the contents of "
+                  + appDir.getPath(), e);
+              return;
+            }
+            for (FileStatus node : logFiles) {
+              if (node.getModificationTime() < cutoffMillis) {
+                try {
+                  fs.delete(node.getPath(), true);
+                } catch (IOException ex) {
+                  logException("Could not delete " + appDir.getPath(), ex);
                 }
-              } catch(IOException e) {
-                logIOException(
-                  "Error reading the contents of " + appDir.getPath(), e);
               }
             }
+          } else if (shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
+            // Application is no longer running
+            try {
+              LOG.info("Deleting aggregated logs in " + appDir.getPath());
+              fs.delete(appDir.getPath(), true);
+            } catch (IOException e) {
+              logException("Could not delete " + appDir.getPath(), e);
+            }
           }
         }
-      } catch (IOException e) {
-        logIOException("Could not read the contents of " + dir, e);
+      } catch (Exception e) {
+        logException("Could not delete " + appDir.getPath(), e);
       }
     }
 
@@ -142,7 +160,7 @@ public class AggregatedLogDeletionService extends AbstractService {
           }
         }
       } catch(IOException e) {
-        logIOException("Error reading the contents of " + dir.getPath(), e);
+        logException("Error reading the contents of " + dir.getPath(), e);
         shouldDelete = false;
       }
       return shouldDelete;
@@ -172,14 +190,14 @@ public class AggregatedLogDeletionService extends AbstractService {
     }
   }
   
-  private static void logIOException(String comment, IOException e) {
-    if(e instanceof AccessControlException) {
-      String message = e.getMessage();
+  private static void logException(String comment, Throwable t) {
+    if(t instanceof AccessControlException) {
+      String message = t.getMessage();
       //TODO fix this after HADOOP-8661
       message = message.split("\n")[0];
       LOG.warn(comment + " " + message);
     } else {
-      LOG.error(comment, e);
+      LOG.error(comment, t);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfaee2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 026996e..4e2d302 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -385,6 +385,74 @@ public class TestAggregatedLogDeletionService {
     deletionSvc.stop();
   }
 
+  @Test
+  public void testRobustLogDeletion() throws Exception {
+    final long RETENTION_SECS = 10 * 24 * 3600;
+
+    String root = "mockfs://foo/";
+    String remoteRootLogDir = root+"tmp/logs";
+    String suffix = "logs";
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class,
+        FileSystem.class);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        "1");
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+    // prevent us from picking up the same mockfs instance from another test
+    FileSystem.closeAll();
+    Path rootPath = new Path(root);
+    FileSystem rootFs = rootPath.getFileSystem(conf);
+    FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
+
+    Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+    Path userDir = new Path(remoteRootLogPath, "me");
+    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+        new FileStatus[]{userDirStatus});
+
+    Path userLogDir = new Path(userDir, suffix);
+    ApplicationId appId1 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    Path app1Dir = new Path(userLogDir, appId1.toString());
+    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
+    ApplicationId appId2 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    Path app2Dir = new Path(userLogDir, "application_a");
+    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
+    ApplicationId appId3 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 3);
+    Path app3Dir = new Path(userLogDir, appId3.toString());
+    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
+
+    when(mockFs.listStatus(userLogDir)).thenReturn(
+        new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
+
+    when(mockFs.listStatus(app1Dir)).thenThrow(
+        new RuntimeException("Should Be Caught and Logged"));
+    Path app3Log3 = new Path(app3Dir, "host1");
+    FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
+    when(mockFs.listStatus(app3Dir)).thenReturn(
+        new FileStatus[]{app3Log3Status});
+
+    final List<ApplicationId> finishedApplications =
+        Collections.unmodifiableList(Arrays.asList(appId1, appId3));
+
+    ApplicationClientProtocol rmClient =
+        createMockRMClient(finishedApplications, null);
+    AggregatedLogDeletionService.LogDeletionTask deletionTask =
+        new AggregatedLogDeletionService.LogDeletionTask(conf,
+            RETENTION_SECS,
+            rmClient);
+    deletionTask.run();
+    verify(mockFs).delete(app3Dir, true);
+  }
+
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {
       super(mock(FileSystem.class));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org