You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/06/14 12:35:42 UTC

[GitHub] [hadoop] szilard-nemeth commented on a diff in pull request #4430: YARN-11176. Refactor TestAggregatedLogDeletionService

szilard-nemeth commented on code in PR #4430:
URL: https://github.com/apache/hadoop/pull/4430#discussion_r896760612


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java:
##########
@@ -44,514 +44,391 @@
 import org.junit.Test;
 import org.junit.Assert;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 public class TestAggregatedLogDeletionService {
-  
+
+  private static final String T_FILE = "TFile";
+  private static final String USER_ME = "me";
+  private static final String DIR_HOST1 = "host1";
+  private static final String DIR_HOST2 = "host2";
+
+  private static final String ROOT = "mockfs://foo/";
+  private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
+  private static final String SUFFIX = "logs";
+  private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
+  private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
+
+  private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
+                                                                     ApplicationId appId,
+                                                                     String user, String suffix,
+                                                                     long modificationTime) {
+    Path path = LogAggregationUtils.getRemoteAppLogDir(
+            remoteRootLogDir, appId, user, suffix);
+    FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
+    return new PathWithFileStatus(path, fileStatus);
+  }
+
+  private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
+    return new FileStatus(0, true, 0, 0, modificationTime, path);
+  }
+
+  private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
+                                                                    long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
+                                                                   long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
+                                                                            String user,
+                                                                            String suffix,
+                                                                            ApplicationId appId,
+                                                                            long modificationTime) {
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
+    FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
+    return new PathWithFileStatus(bucketDir, fStatus);
+  }
+
+  private static FileStatus createFileStatusWithLengthForFile(long length,
+                                                              long modificationTime,
+                                                              Path logPath) {
+    return new FileStatus(length, false, 1, 1, modificationTime, logPath);
+  }
+
+  private static FileStatus createFileStatusWithLengthForDir(long length,
+                                                             long modificationTime,
+                                                             Path logPath) {
+    return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+  }
+
   @Before
   public void closeFilesystems() throws IOException {
     // prevent the same mockfs instance from being reused due to FS cache
     FileSystem.closeAll();
   }
 
+  private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            retainCheckIntervalSeconds);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
+    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
+            LogAggregationTFileController.class.getName());
+    return conf;
+  }
+
   @Test
   public void testDeletion() throws Exception {
     long now = System.currentTimeMillis();
-    long toDeleteTime = now - (2000*1000);
-    long toKeepTime = now - (1500*1000);
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    final Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    long toDeleteTime = now - (2000 * 1000);
+    long toKeepTime = now - (1500 * 1000);
 
+    Configuration conf = setupConfiguration(1800, -1);
 
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
     
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
-    
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); 
-    
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
-
-    ApplicationId appId1 =
-        ApplicationId.newInstance(now, 1);
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixDirStatus = new FileStatus(0, true,
-        0, 0, toDeleteTime, suffixDir);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, toDeleteTime, bucketDir);
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app1Dir);
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+            toKeepTime);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+
+    ApplicationId appId1 = ApplicationId.newInstance(now, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(now, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(now, 3);
+    ApplicationId appId4 = ApplicationId.newInstance(now, 4);
+
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+            toDeleteTime);
+    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
+            toDeleteTime);
+
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
+            USER_ME, SUFFIX, toDeleteTime);
+
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
+            app1.fileStatus, app2.fileStatus, 
+            app3.fileStatus, app4.fileStatus});
     
-    ApplicationId appId2 =
-        ApplicationId.newInstance(now, 2);
-    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId2, "me", suffix);
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app2Dir);
-    
-    ApplicationId appId3 =
-        ApplicationId.newInstance(now, 3);
-    Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId3, "me", suffix);
-    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app3Dir);
-    
-    ApplicationId appId4 =
-        ApplicationId.newInstance(now, 4);
-    Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId4, "me", suffix);
-    FileStatus app4DirStatus =
-        new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
-
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus});
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus, app2DirStatus,
-            app3DirStatus, app4DirStatus});
-    
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{});
-
-
-    Path app2Log1 = new Path(app2Dir, "host1");
-    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
-    
-    Path app2Log2 = new Path(app2Dir, "host2");
-    FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
-    
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-        new FileStatus[]{app2Log1Status, app2Log2Status});
-    
-    Path app3Log1 = new Path(app3Dir, "host1");
-    FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
-    
-    Path app3Log2 = new Path(app3Dir, "host2");
-    FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
-    
-    when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
-    
-    when(mockFs.listStatus(app3Dir)).thenReturn(
-        new FileStatus[]{app3Log1Status, app3Log2Status});
-    
-    Path app4Log1 = new Path(app4Dir, "host1");
-    FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
-    
-    Path app4Log2 = new Path(app4Dir, "host2");
-    FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
-        toKeepTime, app4Log2);
-
-    when(mockFs.listStatus(app4Dir)).thenReturn(
-        new FileStatus[]{app4Log1Status, app4Log2Status});
-
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
-    final List<ApplicationId> runningApplications =
-        Collections.unmodifiableList(Arrays.asList(appId4));
+    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+            toDeleteTime);
+    PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2,
+            toKeepTime);
+    PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
+            toDeleteTime);
+    PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
+            toDeleteTime);
+    PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
+            toDeleteTime);
+    PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
+
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
+    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
+            app2Log2.fileStatus});
+    when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
+            app3Log2.fileStatus});
+    when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
+            app4Log2.fileStatus});
+    when(mockFs.delete(app3.path, true)).thenThrow(
+            new AccessControlException("Injected Error\nStack Trace :("));
+
+    final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
+            Arrays.asList(appId1, appId2, appId3));
+    final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
 
     AggregatedLogDeletionService deletionService =
-        new AggregatedLogDeletionService() {
-          @Override
-          protected ApplicationClientProtocol createRMClient()
-              throws IOException {
-            try {
-              return createMockRMClient(finishedApplications,
-                runningApplications);
-            } catch (Exception e) {
-              throw new IOException(e);
-            }
-          }
-          @Override
-          protected void stopRMClient() {
-            // DO NOTHING
-          }
-        };
+            new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
     deletionService.init(conf);
     deletionService.start();
 
-    verify(mockFs, timeout(2000)).delete(app1Dir, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
-    verify(mockFs, timeout(2000)).delete(app3Dir, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
-    verify(mockFs, timeout(2000)).delete(app4Log1, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
+    int timeout = 2000;
+    verify(mockFs, timeout(timeout)).delete(app1.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
+    verify(mockFs, timeout(timeout)).delete(app3.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
+    verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
 
     deletionService.stop();
   }
 
   @Test
   public void testRefreshLogRetentionSettings() throws Exception {
     long now = System.currentTimeMillis();
-    //time before 2000 sec
     long before2000Secs = now - (2000 * 1000);
-    //time before 50 sec
     long before50Secs = now - (50 * 1000);
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root + "tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    final Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
-
-
-    Path rootPath = new Path(root);
+    int checkIntervalSeconds = 2;
+    int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
+
+    Configuration conf = setupConfiguration(1800, 1);
+
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
-
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        userDir);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[] { userDirStatus });
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+            before50Secs);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+            before50Secs);
+    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+            USER_ME, SUFFIX, appId1, before50Secs);
 
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        suffixDir);
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     //Set time last modified of app1Dir directory and its files to before2000Secs 
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, before50Secs, bucketDir);
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
-        app1Dir);
-    
-    ApplicationId appId2 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    //Set time last modified of app1Dir directory and its files to before50Secs 
-    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId2, "me", suffix);
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        app2Dir);
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, before2000Secs);
 
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixStatus });
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus });
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus, app2DirStatus });
-
-    Path app1Log1 = new Path(app1Dir, "host1");
-    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
-        app1Log1);
+    //Set time last modified of app1Dir directory and its files to before50Secs 
+    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+            USER_ME, SUFFIX, before50Secs);
 
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[] { app1Log1Status });
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
+            app2.fileStatus});
 
-    Path app2Log1 = new Path(app2Dir, "host1");
-    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
-        app2Log1);
+    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
+            before2000Secs);
+    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+            before50Secs);
 
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-        new FileStatus[] { app2Log1Status });
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
+    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
 
     final List<ApplicationId> finishedApplications =
         Collections.unmodifiableList(Arrays.asList(appId1, appId2));
 
-    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
-      @Override
-      protected Configuration createConf() {
-        return conf;
-      }
-      @Override
-      protected ApplicationClientProtocol createRMClient()
-          throws IOException {
-        try {
-          return createMockRMClient(finishedApplications, null);
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
-      }
-      @Override
-      protected void stopRMClient() {
-        // DO NOTHING
-      }
-    };
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+            finishedApplications, conf);
     
     deletionSvc.init(conf);
     deletionSvc.start();
     
     //app1Dir would be deleted since its done above log retention period
-    verify(mockFs, timeout(10000)).delete(app1Dir, true);
-    //app2Dir is not expected to be deleted since its below the threshold
-    verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
-
-    //Now,lets change the confs
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "2");
+    verify(mockFs, timeout(10000)).delete(app1.path, true);
+    //app2Dir is not expected to be deleted since it is below the threshold
+    verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
+
+    //Now, let's change the confs
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            checkIntervalSeconds);
     //We have not called refreshLogSettings,hence don't expect to see the changed conf values
-    Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+    assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
     
     //refresh the log settings
     deletionSvc.refreshLogRetentionSettings();
 
     //Check interval time should reflect the new value
-    Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+    Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
     //app2Dir should be deleted since it falls above the threshold
-    verify(mockFs, timeout(10000)).delete(app2Dir, true);
+    verify(mockFs, timeout(10000)).delete(app2.path, true);
     deletionSvc.stop();
   }
   
   @Test
   public void testCheckInterval() throws Exception {
-    long RETENTION_SECS = 10 * 24 * 3600;
     long now = System.currentTimeMillis();
-    long toDeleteTime = now - RETENTION_SECS*1000;
-
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
 
+    Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
-
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
-        suffixDir);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, now, bucketDir);
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
 
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+            USER_ME, SUFFIX, appId1, now);
 
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus});
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus});
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, now);
+    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
 
-    Path app1Log1 = new Path(app1Dir, "host1");
-    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
 
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{app1Log1Status});
+    final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
 
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1));
-
-    AggregatedLogDeletionService deletionSvc =
-        new AggregatedLogDeletionService() {
-      @Override
-      protected ApplicationClientProtocol createRMClient()
-          throws IOException {
-        try {
-          return createMockRMClient(finishedApplications, null);
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
-      }
-      @Override
-      protected void stopRMClient() {
-        // DO NOTHING
-      }
-    };
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+            finishedApplications);
     deletionSvc.init(conf);
     deletionSvc.start();
  
     verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
-    verify(mockFs, never()).delete(app1Dir, true);
-
-    // modify the timestamp of the logs and verify it's picked up quickly
-    bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
-    app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
-    app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus });
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus });
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{app1Log1Status});
-
-    verify(mockFs, timeout(10000)).delete(app1Dir, true);
+    verify(mockFs, never()).delete(app1.path, true);
+
+    // modify the timestamp of the logs and verify if it's picked up quickly
+    app1.changeModificationTime(toDeleteTime);
+    app1Log1.changeModificationTime(toDeleteTime);
+    bucketDir.changeModificationTime(toDeleteTime);
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
+
+    verify(mockFs, timeout(10000)).delete(app1.path, true);
 
     deletionSvc.stop();
   }
 
   @Test
   public void testRobustLogDeletion() throws Exception {
-    final long RETENTION_SECS = 10 * 24 * 3600;
-
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class,
-        FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
-
-    Path userDir = new Path(remoteRootLogPath, "me");
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
-    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
-    Path bucketDir = new Path(suffixDir, String.valueOf(0));
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
-
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[]{suffixStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[]{bucketDirStatus});
-
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path app1Dir = new Path(bucketDir, appId1.toString());
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
-    ApplicationId appId2 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    Path app2Dir = new Path(bucketDir, "application_a");
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
-    ApplicationId appId3 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 3);
-    Path app3Dir = new Path(bucketDir, appId3.toString());
-    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
-
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-            new FileStatus[]{});
-
-    when(mockFs.listStatus(app1Dir)).thenThrow(
-        new RuntimeException("Should Be Caught and Logged"));
-    Path app3Log3 = new Path(app3Dir, "host1");
-    FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
-    when(mockFs.listStatus(app3Dir)).thenReturn(
-        new FileStatus[]{app3Log3Status});
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId3));
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
+    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
 
-    ApplicationClientProtocol rmClient =
-        createMockRMClient(finishedApplications, null);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
+
+    PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
+    PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);

Review Comment:
   This was the same as in the original code, so I didn't want to alter the behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org