You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by qu...@apache.org on 2022/06/20 12:12:59 UTC

[hadoop] branch trunk updated: YARN-11182. Refactor TestAggregatedLogDeletionService: 2nd phase. Contributed by Szilard Nemeth.

This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d08ffa769d YARN-11182. Refactor TestAggregatedLogDeletionService: 2nd phase. Contributed by Szilard Nemeth.
5d08ffa769d is described below

commit 5d08ffa769d9dd2ac533e1e9d249788f70191555
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Mon Jun 20 14:12:36 2022 +0200

    YARN-11182. Refactor TestAggregatedLogDeletionService: 2nd phase. Contributed by Szilard Nemeth.
---
 .../AggregatedLogDeletionService.java              |   4 +-
 .../logaggregation/LogAggregationTestUtils.java    |  68 +++
 .../TestAggregatedLogDeletionService.java          | 514 ++++++---------------
 .../TestLogAggregationFileControllerFactory.java   |  76 +--
 .../AggregatedLogDeletionServiceForTest.java       |  67 +++
 .../logaggregation/testutils/FileStatusUtils.java  |  76 +++
 .../testutils/LogAggregationTestcase.java          | 421 +++++++++++++++++
 .../testutils/LogAggregationTestcaseBuilder.java   | 172 +++++++
 .../testutils/MockRMClientUtils.java               |  72 +++
 .../testutils/PathWithFileStatus.java              |  45 ++
 10 files changed, 1082 insertions(+), 433 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index 9427068cfc5..eb6466a3a02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -59,7 +59,7 @@ public class AggregatedLogDeletionService extends AbstractService {
   private long checkIntervalMsecs;
   private LogDeletionTask task;
   
-  static class LogDeletionTask extends TimerTask {
+  public static class LogDeletionTask extends TimerTask {
     private Configuration conf;
     private long retentionMillis;
     private String suffix = null;
@@ -101,7 +101,7 @@ public class AggregatedLogDeletionService extends AbstractService {
           }
         }
       } catch (Throwable t) {
-        logException("Error reading root log dir this deletion " +
+        logException("Error reading root log dir, this deletion " +
             "attempt is being aborted", t);
       }
       LOG.info("aggregated log deletion finished.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/LogAggregationTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/LogAggregationTestUtils.java
new file mode 100644
index 00000000000..3cd563a6489
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/LogAggregationTestUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
+
+
+public class LogAggregationTestUtils {
+  public static final String REMOTE_LOG_ROOT = "target/app-logs/";
+
+  public static void enableFileControllers(Configuration conf,
+          List<Class<? extends LogAggregationFileController>> fileControllers,
+          List<String> fileControllerNames) {
+    enableFcs(conf, REMOTE_LOG_ROOT, fileControllers, fileControllerNames);
+  }
+
+  public static void enableFileControllers(Configuration conf,
+                                           String remoteLogRoot,
+                                           List<Class<? extends LogAggregationFileController>> fileControllers,
+                                           List<String> fileControllerNames) {
+    enableFcs(conf, remoteLogRoot, fileControllers, fileControllerNames);
+  }
+
+
+  private static void enableFcs(Configuration conf,
+                                String remoteLogRoot,
+                                List<Class<? extends LogAggregationFileController>> fileControllers,
+                                List<String> fileControllerNames) {
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+            StringUtils.join(fileControllerNames, ","));
+    for (int i = 0; i < fileControllers.size(); i++) {
+      Class<? extends LogAggregationFileController> fileController = fileControllers.get(i);
+      String controllerName = fileControllerNames.get(i);
+
+      conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, controllerName),
+              fileController, LogAggregationFileController.class);
+      conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, controllerName),
+              remoteLogRoot + controllerName + "/");
+      conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, controllerName),
+              controllerName);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index f855f9181cd..13a9afa84e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -18,96 +18,54 @@
 
 package org.apache.hadoop.yarn.logaggregation;
 
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.util.Lists;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
+import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcase;
+import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder;
+import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
+import org.apache.log4j.Level;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Assert;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
 
-public class TestAggregatedLogDeletionService {
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
+import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
+import static org.mockito.Mockito.mock;
 
+public class TestAggregatedLogDeletionService {
   private static final String T_FILE = "TFile";
+  private static final String I_FILE = "IFile";
   private static final String USER_ME = "me";
   private static final String DIR_HOST1 = "host1";
   private static final String DIR_HOST2 = "host2";
 
   private static final String ROOT = "mockfs://foo/";
-  private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
+  private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs/";
   private static final String SUFFIX = "logs";
-  private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
   private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
 
-  private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
-                                                                     ApplicationId appId,
-                                                                     String user, String suffix,
-                                                                     long modificationTime) {
-    Path path = LogAggregationUtils.getRemoteAppLogDir(
-            remoteRootLogDir, appId, user, suffix);
-    FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
-    return new PathWithFileStatus(path, fileStatus);
-  }
-
-  private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
-    return new FileStatus(0, true, 0, 0, modificationTime, path);
-  }
-
-  private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
-                                                                    long modificationTime) {
-    Path logPath = new Path(baseDir, childDir);
-    FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
-    return new PathWithFileStatus(logPath, fStatus);
-  }
-
-  private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
-                                                                   long modificationTime) {
-    Path logPath = new Path(baseDir, childDir);
-    FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
-    return new PathWithFileStatus(logPath, fStatus);
-  }
-
-  private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
-                                                                            String user,
-                                                                            String suffix,
-                                                                            ApplicationId appId,
-                                                                            long modificationTime) {
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
-    FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
-    return new PathWithFileStatus(bucketDir, fStatus);
-  }
-
-  private static FileStatus createFileStatusWithLengthForFile(long length,
-                                                              long modificationTime,
-                                                              Path logPath) {
-    return new FileStatus(length, false, 1, 1, modificationTime, logPath);
-  }
+  private static final List<Class<? extends LogAggregationFileController>>
+          ALL_FILE_CONTROLLERS = Arrays.asList(
+          LogAggregationIndexedFileController.class,
+          LogAggregationTFileController.class);
+  public static final List<String> ALL_FILE_CONTROLLER_NAMES = Arrays.asList(I_FILE, T_FILE);
 
-  private static FileStatus createFileStatusWithLengthForDir(long length,
-                                                             long modificationTime,
-                                                             Path logPath) {
-    return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+  @BeforeClass
+  public static void beforeClass() {
+    org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
   }
 
   @Before
@@ -138,80 +96,34 @@ public class TestAggregatedLogDeletionService {
     long toKeepTime = now - (1500 * 1000);
 
     Configuration conf = setupConfiguration(1800, -1);
-
-    Path rootPath = new Path(ROOT);
-    FileSystem rootFs = rootPath.getFileSystem(conf);
-    FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
-    
-    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
-    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
-            toKeepTime);
-
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
-
-    ApplicationId appId1 = ApplicationId.newInstance(now, 1);
-    ApplicationId appId2 = ApplicationId.newInstance(now, 2);
-    ApplicationId appId3 = ApplicationId.newInstance(now, 3);
-    ApplicationId appId4 = ApplicationId.newInstance(now, 4);
-
-    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
-            toDeleteTime);
-    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
-            toDeleteTime);
-
-    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
-            USER_ME, SUFFIX, toDeleteTime);
-    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
-            USER_ME, SUFFIX, toDeleteTime);
-    PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
-            USER_ME, SUFFIX, toDeleteTime);
-    PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
-            USER_ME, SUFFIX, toDeleteTime);
-
-    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
-    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
-    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
-        app1.fileStatus, app2.fileStatus, app3.fileStatus, app4.fileStatus});
-    
-    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
-        toDeleteTime);
-    PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2, toKeepTime);
-    PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
-        toDeleteTime);
-    PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
-        toDeleteTime);
-    PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
-        toDeleteTime);
-    PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
-
-    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
-    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
-        app2Log2.fileStatus});
-    when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
-        app3Log2.fileStatus});
-    when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
-        app4Log2.fileStatus});
-    when(mockFs.delete(app3.path, true)).thenThrow(
-            new AccessControlException("Injected Error\nStack Trace :("));
-
-    final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
-            Arrays.asList(appId1, appId2, appId3));
-    final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
-
-    AggregatedLogDeletionService deletionService =
-            new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
-    deletionService.init(conf);
-    deletionService.start();
-
-    int timeout = 2000;
-    verify(mockFs, timeout(timeout)).delete(app1.path, true);
-    verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
-    verify(mockFs, timeout(timeout)).delete(app3.path, true);
-    verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
-    verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
-    verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
-
-    deletionService.stop();
+    long timeout = 2000L;
+    LogAggregationTestcaseBuilder.create(conf)
+            .withRootPath(ROOT)
+            .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+            .withUserDir(USER_ME, toKeepTime)
+            .withSuffixDir(SUFFIX, toDeleteTime)
+            .withBucketDir(toDeleteTime)
+            .withApps(Lists.newArrayList(
+                    new AppDescriptor(toDeleteTime, Lists.newArrayList()),
+                    new AppDescriptor(toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime))),
+                    new AppDescriptor(toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toDeleteTime))),
+                    new AppDescriptor(toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime)))))
+            .withFinishedApps(1, 2, 3)
+            .withRunningApps(4)
+            .injectExceptionForAppDirDeletion(3)
+            .build()
+            .setupAndRunDeletionService()
+            .verifyAppDirsDeleted(timeout, 1, 3)
+            .verifyAppDirsNotDeleted(timeout, 2, 4)
+            .verifyAppFileDeleted(4, 1, timeout)
+            .verifyAppFileNotDeleted(4, 2, timeout)
+            .teardown();
   }
 
   @Test
@@ -224,74 +136,47 @@ public class TestAggregatedLogDeletionService {
 
     Configuration conf = setupConfiguration(1800, 1);
 
-    Path rootPath = new Path(ROOT);
-    FileSystem rootFs = rootPath.getFileSystem(conf);
-    FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
-
-    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
-
-    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
-
-    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
-            before50Secs);
-    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
-            before50Secs);
-    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
-            USER_ME, SUFFIX, appId1, before50Secs);
-
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
-
-    //Set time last modified of app1Dir directory and its files to before2000Secs 
-    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
-            USER_ME, SUFFIX, before2000Secs);
-
-    //Set time last modified of app1Dir directory and its files to before50Secs
-    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
-            USER_ME, SUFFIX, before50Secs);
-
-    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
-    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
-    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
-            app2.fileStatus});
-
-    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
-            before2000Secs);
-    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
-            before50Secs);
-
-    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
-    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
-
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId2));
-
-    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
-            finishedApplications, conf);
-    
-    deletionSvc.init(conf);
-    deletionSvc.start();
+    LogAggregationTestcase testcase = LogAggregationTestcaseBuilder.create(conf)
+            .withRootPath(ROOT)
+            .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+            .withUserDir(USER_ME, before50Secs)
+            .withSuffixDir(SUFFIX, before50Secs)
+            .withBucketDir(before50Secs)
+            .withApps(Lists.newArrayList(
+                    //Set time last modified of app1Dir directory and its files to before2000Secs
+                    new AppDescriptor(before2000Secs, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, before2000Secs))),
+                    //Set time last modified of app1Dir directory and its files to before50Secs
+                    new AppDescriptor(before50Secs, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, before50Secs))))
+            )
+            .withFinishedApps(1, 2)
+            .withRunningApps()
+            .build();
     
-    //app1Dir would be deleted since its done above log retention period
-    verify(mockFs, timeout(10000)).delete(app1.path, true);
-    //app2Dir is not expected to be deleted since it is below the threshold
-    verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
-
-    //Now, let's change the confs
+    testcase
+            .setupAndRunDeletionService()
+            //app1Dir would be deleted since it is done above log retention period
+            .verifyAppDirDeleted(1, 10000L)
+            //app2Dir is not expected to be deleted since it is below the threshold
+            .verifyAppDirNotDeleted(2, 3000L);
+
+    //Now, let's change the log aggregation retention configs
     conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
     conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
             checkIntervalSeconds);
-    //We have not called refreshLogSettings,hence don't expect to see the changed conf values
-    assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
-    
-    //refresh the log settings
-    deletionSvc.refreshLogRetentionSettings();
 
-    //Check interval time should reflect the new value
-    Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
-    //app2Dir should be deleted since it falls above the threshold
-    verify(mockFs, timeout(10000)).delete(app2.path, true);
-    deletionSvc.stop();
+    testcase
+            //We have not called refreshLogSettings, hence don't expect to see
+            // the changed conf values
+            .verifyCheckIntervalMilliSecondsNotEqualTo(checkIntervalMilliSeconds)
+            //refresh the log settings
+            .refreshLogRetentionSettings()
+            //Check interval time should reflect the new value
+            .verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
+            //app2Dir should be deleted since it falls above the threshold
+            .verifyAppDirDeleted(2, 10000L)
+            .teardown();
   }
   
   @Test
@@ -303,52 +188,30 @@ public class TestAggregatedLogDeletionService {
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(ROOT);
-    FileSystem rootFs = rootPath.getFileSystem(conf);
-    FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
-
-    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
-
-    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
-    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
-
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
-
-    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
-            USER_ME, SUFFIX, appId1, now);
-
-    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
-            USER_ME, SUFFIX, now);
-    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
-
-    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
-    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
-    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
-    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
-
-    final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
-
-    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
-            finishedApplications);
-    deletionSvc.init(conf);
-    deletionSvc.start();
- 
-    verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
-    verify(mockFs, never()).delete(app1.path, true);
 
-    // modify the timestamp of the logs and verify if it's picked up quickly
-    app1.changeModificationTime(toDeleteTime);
-    app1Log1.changeModificationTime(toDeleteTime);
-    bucketDir.changeModificationTime(toDeleteTime);
-    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
-    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
-    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
-    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
-
-    verify(mockFs, timeout(10000)).delete(app1.path, true);
-
-    deletionSvc.stop();
+    LogAggregationTestcaseBuilder.create(conf)
+            .withRootPath(ROOT)
+            .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+            .withUserDir(USER_ME, now)
+            .withSuffixDir(SUFFIX, now)
+            .withBucketDir(now)
+            .withApps(Lists.newArrayList(
+                    new AppDescriptor(now,
+                            Lists.newArrayList(Pair.of(DIR_HOST1, now))),
+                    new AppDescriptor(now)))
+            .withFinishedApps(1)
+            .withRunningApps()
+            .build()
+            .setupAndRunDeletionService()
+            .verifyAnyPathListedAtLeast(4, 10000L)
+            .verifyAppDirNotDeleted(1, NO_TIMEOUT)
+            // modify the timestamp of the logs and verify if it is picked up quickly
+            .changeModTimeOfApp(1, toDeleteTime)
+            .changeModTimeOfAppLogDir(1, 1, toDeleteTime)
+            .changeModTimeOfBucketDir(toDeleteTime)
+            .reinitAllPaths()
+            .verifyAppDirDeleted(1, 10000L)
+            .teardown();
   }
 
   @Test
@@ -357,44 +220,25 @@ public class TestAggregatedLogDeletionService {
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(ROOT);
-    FileSystem rootFs = rootPath.getFileSystem(conf);
-    FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
-
-    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
-
-    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
-    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
-    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
-
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
-    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
-    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
-
-    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
-
-    PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
-    PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
-    PathWithFileStatus app3 = createDirLogPathWithFileStatus(bucketDir.path, appId3.toString(), 0);
-    PathWithFileStatus app3Log3 = createDirLogPathWithFileStatus(app3.path, DIR_HOST1, 0);
-
-    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{
-        app1.fileStatus,app2.fileStatus, app3.fileStatus});
-    when(mockFs.listStatus(app1.path)).thenThrow(
-        new RuntimeException("Should be caught and logged"));
-    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{});
-    when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log3.fileStatus});
-
-    final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
-            Arrays.asList(appId1, appId3));
-
-    ApplicationClientProtocol rmClient = createMockRMClient(finishedApplications, null);
-    AggregatedLogDeletionService.LogDeletionTask deletionTask =
-        new AggregatedLogDeletionService.LogDeletionTask(conf, TEN_DAYS_IN_SECONDS, rmClient);
-    deletionTask.run();
-    verify(mockFs).delete(app3.path, true);
+    long modTime = 0L;
+
+    LogAggregationTestcaseBuilder.create(conf)
+            .withRootPath(ROOT)
+            .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+            .withUserDir(USER_ME, modTime)
+            .withSuffixDir(SUFFIX, modTime)
+            .withBucketDir(modTime, "0")
+            .withApps(Lists.newArrayList(
+                    new AppDescriptor(modTime),
+                    new AppDescriptor(modTime),
+                    new AppDescriptor(modTime, Lists.newArrayList(Pair.of(DIR_HOST1, modTime)))))
+            .withAdditionalAppDirs(Lists.newArrayList(Pair.of("application_a", modTime)))
+            .withFinishedApps(1, 3)
+            .withRunningApps()
+            .injectExceptionForAppDirDeletion(1)
+            .build()
+            .runDeletionTask(TEN_DAYS_IN_SECONDS)
+            .verifyAppDirDeleted(3, NO_TIMEOUT);
   }
 
   static class MockFileSystem extends FilterFileSystem {
@@ -403,98 +247,10 @@ public class TestAggregatedLogDeletionService {
     }
 
     public void initialize(URI name, Configuration conf) throws IOException {}
-  }
-
-  private static ApplicationClientProtocol createMockRMClient(
-      List<ApplicationId> finishedApplications,
-      List<ApplicationId> runningApplications) throws Exception {
-    final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
-    if (finishedApplications != null && !finishedApplications.isEmpty()) {
-      for (ApplicationId appId : finishedApplications) {
-        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
-        GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
-        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
-      }
-    }
-    if (runningApplications != null && !runningApplications.isEmpty()) {
-      for (ApplicationId appId : runningApplications) {
-        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
-        GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
-        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
-      }
-    }
-    return mockProtocol;
-  }
-
-  private static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
-    ApplicationReport report = mock(ApplicationReport.class);
-    when(report.getYarnApplicationState()).thenReturn(
-      YarnApplicationState.RUNNING);
-    GetApplicationReportResponse response =
-        mock(GetApplicationReportResponse.class);
-    when(response.getApplicationReport()).thenReturn(report);
-    return response;
-  }
-
-  private static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
-    ApplicationReport report = mock(ApplicationReport.class);
-    when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
-    GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
-    when(response.getApplicationReport()).thenReturn(report);
-    return response;
-  }
-
-  private static class PathWithFileStatus {
-    private final Path path;
-    private FileStatus fileStatus;
-
-    PathWithFileStatus(Path path, FileStatus fileStatus) {
-      this.path = path;
-      this.fileStatus = fileStatus;
-    }
-
-    public void changeModificationTime(long modTime) {
-      fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
-              fileStatus.getReplication(),
-              fileStatus.getBlockSize(), modTime, fileStatus.getPath());
-    }
-  }
-
-  private static class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
-    private final List<ApplicationId> finishedApplications;
-    private final List<ApplicationId> runningApplications;
-    private final Configuration conf;
-
-    AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
-                                               List<ApplicationId> finishedApplications) {
-      this(runningApplications, finishedApplications, null);
-    }
-
-    AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
-                                               List<ApplicationId> finishedApplications,
-                                               Configuration conf) {
-      this.runningApplications = runningApplications;
-      this.finishedApplications = finishedApplications;
-      this.conf = conf;
-    }
-
-    @Override
-    protected ApplicationClientProtocol createRMClient() throws IOException {
-      try {
-        return createMockRMClient(finishedApplications, runningApplications);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    protected Configuration createConf() {
-      return conf;
-    }
 
     @Override
-    protected void stopRMClient() {
-      // DO NOTHING
+    public boolean hasPathCapability(Path path, String capability) {
+      return true;
     }
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
index 2d2fb49c0ef..c1b991b9bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -18,24 +18,6 @@
 
 package org.apache.hadoop.yarn.logaggregation.filecontroller;
 
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Writer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -56,6 +38,21 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS;
+import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.REMOTE_LOG_ROOT;
+import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
+import static org.junit.Assert.*;
+
 /**
  * Test LogAggregationFileControllerFactory.
  */
@@ -63,7 +60,6 @@ public class TestLogAggregationFileControllerFactory extends Configured {
   private static final Logger LOG = LoggerFactory.getLogger(
       TestLogAggregationFileControllerFactory.class);
 
-  private static final String REMOTE_LOG_ROOT = "target/app-logs/";
   private static final String REMOTE_DEFAULT_DIR = "default/";
   private static final String APP_OWNER = "test";
 
@@ -87,8 +83,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
   public void setup() throws IOException {
     Configuration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
-        REMOTE_DEFAULT_DIR);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT + REMOTE_DEFAULT_DIR);
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
     setConf(conf);
   }
@@ -143,36 +138,15 @@ public class TestLogAggregationFileControllerFactory extends Configured {
   @Test(expected = Exception.class)
   public void testLogAggregationFileControllerFactoryClassNotSet() {
     Configuration conf = getConf();
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
-        "TestLogAggregationFileController");
+    conf.set(LOG_AGGREGATION_FILE_FORMATS, "TestLogAggregationFileController");
     new LogAggregationFileControllerFactory(conf);
     fail("TestLogAggregationFileController's class was not set, " +
         "but the factory creation did not fail.");
   }
 
-  private void enableFileControllers(
-      List<Class<? extends LogAggregationFileController>> fileControllers,
-      List<String> fileControllerNames) {
-    Configuration conf = getConf();
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
-        StringUtils.join(fileControllerNames, ","));
-    for (int i = 0; i < fileControllers.size(); i++) {
-      Class<? extends LogAggregationFileController> fileController =
-          fileControllers.get(i);
-      String controllerName = fileControllerNames.get(i);
-
-      conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
-          controllerName), fileController, LogAggregationFileController.class);
-      conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
-          controllerName), REMOTE_LOG_ROOT + controllerName + "/");
-      conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
-          controllerName), controllerName);
-    }
-  }
-
   @Test
   public void testLogAggregationFileControllerFactory() throws Exception {
-    enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
+    enableFileControllers(getConf(), ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
     LogAggregationFileControllerFactory factory =
         new LogAggregationFileControllerFactory(getConf());
     List<LogAggregationFileController> list =
@@ -199,8 +173,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
 
   @Test
   public void testClassConfUsed() {
-    enableFileControllers(Collections.singletonList(
-        LogAggregationTFileController.class),
+    enableFileControllers(getConf(), Collections.singletonList(LogAggregationTFileController.class),
         Collections.singletonList("TFile"));
     LogAggregationFileControllerFactory factory =
         new LogAggregationFileControllerFactory(getConf());
@@ -215,7 +188,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
   @Test
   public void testNodemanagerConfigurationIsUsed() {
     Configuration conf = getConf();
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
+    conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
     LogAggregationFileControllerFactory factory =
         new LogAggregationFileControllerFactory(conf);
     LogAggregationFileController fc = factory.getFileControllerForWrite();
@@ -231,7 +204,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
     Configuration conf = getConf();
     conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
     conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
+    conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
 
     LogAggregationFileControllerFactory factory =
         new LogAggregationFileControllerFactory(getConf());
@@ -268,20 +241,19 @@ public class TestLogAggregationFileControllerFactory extends Configured {
     }
 
     @Override
-    public void initializeWriter(LogAggregationFileControllerContext context)
-        throws IOException {
+    public void initializeWriter(LogAggregationFileControllerContext context) {
       // Do Nothing
     }
 
     @Override
     public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
-        OutputStream os) throws IOException {
+        OutputStream os) {
       return false;
     }
 
     @Override
     public List<ContainerLogMeta> readAggregatedLogsMeta(
-        ContainerLogsRequest logRequest) throws IOException {
+        ContainerLogsRequest logRequest) {
       return null;
     }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
new file mode 100644
index 00000000000..49042cf458b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
+
+public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
+  private final List<ApplicationId> finishedApplications;
+  private final List<ApplicationId> runningApplications;
+  private final Configuration conf;
+
+  public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
+                                             List<ApplicationId> finishedApplications) {
+    this(runningApplications, finishedApplications, null);
+  }
+
+  public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
+                                             List<ApplicationId> finishedApplications,
+                                             Configuration conf) {
+    this.runningApplications = runningApplications;
+    this.finishedApplications = finishedApplications;
+    this.conf = conf;
+  }
+
+  @Override
+  protected ApplicationClientProtocol createRMClient() throws IOException {
+    try {
+      return createMockRMClient(finishedApplications, runningApplications);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected Configuration createConf() {
+    return conf;
+  }
+
+  @Override
+  protected void stopRMClient() {
+    // DO NOTHING
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/FileStatusUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/FileStatusUtils.java
new file mode 100644
index 00000000000..c4af8618614
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/FileStatusUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+
+public class FileStatusUtils {
+  public static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
+                                                             ApplicationId appId,
+                                                             String user, String suffix,
+                                                             long modificationTime) {
+    Path path = LogAggregationUtils.getRemoteAppLogDir(
+            remoteRootLogDir, appId, user, suffix);
+    FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
+    return new PathWithFileStatus(path, fileStatus);
+  }
+
+  public static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
+    return new FileStatus(0, true, 0, 0, modificationTime, path);
+  }
+
+  public static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
+                                                            long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  public static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
+                                                           long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  public static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
+                                                                    String user,
+                                                                    String suffix,
+                                                                    ApplicationId appId,
+                                                                    long modificationTime) {
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
+    FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
+    return new PathWithFileStatus(bucketDir, fStatus);
+  }
+
+  public static FileStatus createFileStatusWithLengthForFile(long length,
+                                                              long modificationTime,
+                                                              Path logPath) {
+    return new FileStatus(length, false, 1, 1, modificationTime, logPath);
+  }
+
+  public static FileStatus createFileStatusWithLengthForDir(long length,
+                                                             long modificationTime,
+                                                             Path logPath) {
+    return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
new file mode 100644
index 00000000000..8f535d40714
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
+import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.*;
+import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
+import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+public class LogAggregationTestcase {
+  private static final Logger LOG = LoggerFactory.getLogger(LogAggregationTestcase.class);
+
+  private final Configuration conf;
+  private final long now;
+  private PathWithFileStatus bucketDir;
+  private final long bucketDirModTime;
+  private PathWithFileStatus userDir;
+  private final String userDirName;
+  private final long userDirModTime;
+  private PathWithFileStatus suffixDir;
+  private final String suffix;
+  private final String suffixDirName;
+  private final long suffixDirModTime;
+  private final String bucketId;
+  private final Path remoteRootLogPath;
+  private final Map<Integer, Exception> injectedAppDirDeletionExceptions;
+  private final List<String> fileControllers;
+  private final List<Pair<String, Long>> additionalAppDirs;
+
+  private final List<ApplicationId> applicationIds = new ArrayList<>();
+  private final int[] runningAppIds;
+  private final int[] finishedAppIds;
+  private final List<List<PathWithFileStatus>> appFiles = new ArrayList<>();
+  private final FileSystem mockFs;
+  private List<PathWithFileStatus> appDirs;
+  private final List<AppDescriptor> appDescriptors;
+  private AggregatedLogDeletionServiceForTest deletionService;
+
+  public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOException {
+    conf = builder.conf;
+    now = builder.now;
+    bucketDir = builder.bucketDir;
+    bucketDirModTime = builder.bucketDirModTime;
+    userDir = builder.userDir;
+    userDirName = builder.userDirName;
+    userDirModTime = builder.userDirModTime;
+    suffix = builder.suffix;
+    suffixDir = builder.suffixDir;
+    suffixDirName = builder.suffixDirName;
+    suffixDirModTime = builder.suffixDirModTime;
+    bucketId = builder.bucketId;
+    appDescriptors = builder.apps;
+    runningAppIds = builder.runningAppIds;
+    finishedAppIds = builder.finishedAppIds;
+    remoteRootLogPath = builder.remoteRootLogPath;
+    injectedAppDirDeletionExceptions = builder.injectedAppDirDeletionExceptions;
+    fileControllers = builder.fileControllers;
+    additionalAppDirs = builder.additionalAppDirs;
+
+    mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
+    validateAppControllers();
+    setupMocks();
+  }
+
+  private void validateAppControllers() {
+    Set<String> controllers = appDescriptors.stream()
+            .map(a -> a.fileController)
+            .filter(Objects::nonNull)
+            .collect(Collectors.toSet());
+    Set<String> availableControllers = fileControllers != null ?
+            new HashSet<>(this.fileControllers) : Sets.newHashSet();
+    Set<String> difference = Sets.difference(controllers, availableControllers);
+    if (!difference.isEmpty()) {
+      throw new IllegalStateException(String.format("Invalid controller defined!" +
+                      " Available: %s, Actual: %s", availableControllers, controllers));
+    }
+  }
+
+  private void setupMocks() throws IOException {
+    createApplicationsByDescriptors();
+
+    List<Path> rootPaths = determineRootPaths();
+    for (Path rootPath : rootPaths) {
+      String controllerName = rootPath.getName();
+      ApplicationId arbitraryAppIdForBucketDir = this.applicationIds.get(0);
+      userDir = createDirLogPathWithFileStatus(rootPath, userDirName, userDirModTime);
+      suffixDir = createDirLogPathWithFileStatus(userDir.path, suffixDirName, suffixDirModTime);
+      if (bucketId != null) {
+        bucketDir = createDirLogPathWithFileStatus(suffixDir.path, bucketId, bucketDirModTime);
+      } else {
+        bucketDir = createDirBucketDirLogPathWithFileStatus(rootPath, userDirName, suffix,
+                arbitraryAppIdForBucketDir, bucketDirModTime);
+      }
+      setupListStatusForPath(rootPath, userDir);
+      initFileSystemListings(controllerName);
+    }
+  }
+
+  private List<Path> determineRootPaths() {
+    List<Path> rootPaths = new ArrayList<>();
+    if (fileControllers != null && !fileControllers.isEmpty()) {
+      for (String fileController : fileControllers) {
+        //Generic path: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
+        // <application id>/<NodeManager id>
+
+        //remoteRootLogPath: <remote-app-log-dir>/
+        //example: mockfs://foo/tmp/logs/
+
+        //userDir: <remote-app-log-dir>/<user>/
+        //example: mockfs://foo/tmp/logs/me/
+
+        //suffixDir: <remote-app-log-dir>/<user>/bucket-<suffix>/
+        //example: mockfs://foo/tmp/logs/me/bucket-logs/
+
+        //bucketDir: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
+        //example: mockfs://foo/tmp/logs/me/bucket-logs/0001/
+
+        //remoteRootLogPath with controller: <remote-app-log-dir>/<controllerName>
+        //example: mockfs://foo/tmp/logs/IFile
+        rootPaths.add(new Path(remoteRootLogPath, fileController));
+      }
+    } else {
+      rootPaths.add(remoteRootLogPath);
+    }
+    return rootPaths;
+  }
+
+  private void initFileSystemListings(String controllerName) throws IOException {
+    setupListStatusForPath(userDir, suffixDir);
+    setupListStatusForPath(suffixDir, bucketDir);
+    setupListStatusForPath(bucketDir, appDirs.stream()
+            .filter(app -> app.path.toString().contains(controllerName))
+            .map(app -> app.fileStatus)
+            .toArray(FileStatus[]::new));
+
+    for (Pair<String, Long> appDirPair : additionalAppDirs) {
+      PathWithFileStatus appDir = createDirLogPathWithFileStatus(bucketDir.path,
+              appDirPair.getLeft(), appDirPair.getRight());
+      setupListStatusForPath(appDir, new FileStatus[] {});
+    }
+  }
+
+  private void createApplicationsByDescriptors() throws IOException {
+    int len = appDescriptors.size();
+    appDirs = new ArrayList<>(len);
+
+    for (int i = 0; i < len; i++) {
+      AppDescriptor appDesc = appDescriptors.get(i);
+      ApplicationId applicationId = appDesc.createApplicationId(now, i + 1);
+      applicationIds.add(applicationId);
+      Path basePath = this.remoteRootLogPath;
+      if (appDesc.fileController != null) {
+        basePath = new Path(basePath, appDesc.fileController);
+      }
+
+      PathWithFileStatus appDir = createPathWithFileStatusForAppId(
+              basePath, applicationId, userDirName, suffix, appDesc.modTimeOfAppDir);
+      LOG.debug("Created application with ID '{}' to path '{}'", applicationId, appDir.path);
+      appDirs.add(appDir);
+      addAppChildrenFiles(appDesc, appDir);
+    }
+
+    setupFsMocksForAppsAndChildrenFiles();
+
+    for (Map.Entry<Integer, Exception> e : injectedAppDirDeletionExceptions.entrySet()) {
+      when(mockFs.delete(this.appDirs.get(e.getKey()).path, true)).thenThrow(e.getValue());
+    }
+  }
+
+  private void setupFsMocksForAppsAndChildrenFiles() throws IOException {
+    for (int i = 0; i < appDirs.size(); i++) {
+      List<PathWithFileStatus> appChildren = appFiles.get(i);
+      Path appPath = appDirs.get(i).path;
+      setupListStatusForPath(appPath,
+              appChildren.stream()
+                      .map(child -> child.fileStatus)
+                      .toArray(FileStatus[]::new));
+    }
+  }
+
+  private void setupListStatusForPath(Path dir, PathWithFileStatus pathWithFileStatus)
+          throws IOException {
+    setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
+  }
+
+  private void setupListStatusForPath(PathWithFileStatus dir, PathWithFileStatus pathWithFileStatus)
+          throws IOException {
+    setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
+  }
+
+  private void setupListStatusForPath(Path dir, FileStatus[] fileStatuses) throws IOException {
+    LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir, fileStatuses);
+    when(mockFs.listStatus(dir)).thenReturn(fileStatuses);
+  }
+
+  private void setupListStatusForPath(PathWithFileStatus dir, FileStatus[] fileStatuses)
+          throws IOException {
+    LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir.path, fileStatuses);
+    when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
+  }
+
+  public LogAggregationTestcase setupAndRunDeletionService() {
+    List<ApplicationId> finishedApps = createFinishedAppsList();
+    List<ApplicationId> runningApps = createRunningAppsList();
+    deletionService = new AggregatedLogDeletionServiceForTest(runningApps, finishedApps, conf);
+    deletionService.init(conf);
+    deletionService.start();
+    return this;
+  }
+
+  private List<ApplicationId> createRunningAppsList() {
+    List<ApplicationId> runningApps = new ArrayList<>();
+    for (int i : runningAppIds) {
+      ApplicationId appId = this.applicationIds.get(i - 1);
+      runningApps.add(appId);
+    }
+    return runningApps;
+  }
+
+  private List<ApplicationId> createFinishedAppsList() {
+    List<ApplicationId> finishedApps = new ArrayList<>();
+    for (int i : finishedAppIds) {
+      ApplicationId appId = this.applicationIds.get(i - 1);
+      finishedApps.add(appId);
+    }
+    return finishedApps;
+  }
+
+  public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws Exception {
+    List<ApplicationId> finishedApps = createFinishedAppsList();
+    List<ApplicationId> runningApps = createRunningAppsList();
+    ApplicationClientProtocol rmClient = createMockRMClient(finishedApps, runningApps);
+    AggregatedLogDeletionService.LogDeletionTask deletionTask =
+            new AggregatedLogDeletionService.LogDeletionTask(conf, retentionSeconds, rmClient);
+    deletionTask.run();
+    return this;
+  }
+
+  private void addAppChildrenFiles(AppDescriptor appDesc, PathWithFileStatus appDir) {
+    List<PathWithFileStatus> appChildren = new ArrayList<>();
+    for (Pair<String, Long> fileWithModDate : appDesc.filesWithModDate) {
+      PathWithFileStatus appChildFile = createFileLogPathWithFileStatus(appDir.path,
+              fileWithModDate.getLeft(),
+              fileWithModDate.getRight());
+      appChildren.add(appChildFile);
+    }
+    this.appFiles.add(appChildren);
+  }
+
+  public LogAggregationTestcase verifyAppDirsDeleted(long timeout, int... ids) throws IOException {
+    for (int id : ids) {
+      verifyAppDirDeleted(id, timeout);
+    }
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppDirsNotDeleted(long timeout, int... ids)
+          throws IOException {
+    for (int id : ids) {
+      verifyAppDirNotDeleted(id, timeout);
+    }
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppDirDeleted(int id, long timeout) throws IOException {
+    verifyAppDirDeletion(id, 1, timeout);
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppDirNotDeleted(int id, long timeout) throws IOException {
+    verifyAppDirDeletion(id, 0, timeout);
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppFilesDeleted(long timeout,
+                                                      List<Pair<Integer, Integer>> pairs)
+          throws IOException {
+    for (Pair<Integer, Integer> pair : pairs) {
+      verifyAppFileDeleted(pair.getLeft(), pair.getRight(), timeout);
+    }
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppFilesNotDeleted(long timeout,
+                                                         List<Pair<Integer, Integer>> pairs)
+          throws IOException {
+    for (Pair<Integer, Integer> pair : pairs) {
+      verifyAppFileNotDeleted(pair.getLeft(), pair.getRight(), timeout);
+    }
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppFileDeleted(int id, int fileNo, long timeout)
+          throws IOException {
+    verifyAppFileDeletion(id, fileNo, 1, timeout);
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAppFileNotDeleted(int id, int fileNo, long timeout)
+          throws IOException {
+    verifyAppFileDeletion(id, fileNo, 0, timeout);
+    return this;
+  }
+
+  private void verifyAppDirDeletion(int id, int times, long timeout) throws IOException {
+    if (timeout == NO_TIMEOUT) {
+      verify(mockFs, times(times)).delete(this.appDirs.get(id - 1).path, true);
+    } else {
+      verify(mockFs, timeout(timeout).times(times)).delete(this.appDirs.get(id - 1).path, true);
+    }
+  }
+
+  private void verifyAppFileDeletion(int appId, int fileNo, int times, long timeout)
+          throws IOException {
+    List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
+    PathWithFileStatus file = childrenFiles.get(fileNo - 1);
+    verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
+  }
+
+  public void teardown() {
+    deletionService.stop();
+  }
+
+  public LogAggregationTestcase refreshLogRetentionSettings() throws IOException {
+    deletionService.refreshLogRetentionSettings();
+    return this;
+  }
+
+  public AggregatedLogDeletionService getDeletionService() {
+    return deletionService;
+  }
+
+  public LogAggregationTestcase verifyCheckIntervalMilliSecondsEqualTo(
+          int checkIntervalMilliSeconds) {
+    assertEquals(checkIntervalMilliSeconds, deletionService.getCheckIntervalMsecs());
+    return this;
+  }
+
+  public LogAggregationTestcase verifyCheckIntervalMilliSecondsNotEqualTo(
+          int checkIntervalMilliSeconds) {
+    assertTrue(checkIntervalMilliSeconds != deletionService.getCheckIntervalMsecs());
+    return this;
+  }
+
+  public LogAggregationTestcase verifyAnyPathListedAtLeast(int atLeast, long timeout)
+          throws IOException {
+    verify(mockFs, timeout(timeout).atLeast(atLeast)).listStatus(any(Path.class));
+    return this;
+  }
+
+  public LogAggregationTestcase changeModTimeOfApp(int appId, long modTime) {
+    PathWithFileStatus appDir = appDirs.get(appId - 1);
+    appDir.changeModificationTime(modTime);
+    return this;
+  }
+
+  public LogAggregationTestcase changeModTimeOfAppLogDir(int appId, int fileNo, long modTime) {
+    List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
+    PathWithFileStatus file = childrenFiles.get(fileNo - 1);
+    file.changeModificationTime(modTime);
+    return this;
+  }
+
+  public LogAggregationTestcase changeModTimeOfBucketDir(long modTime) {
+    bucketDir.changeModificationTime(modTime);
+    return this;
+  }
+
+  public LogAggregationTestcase reinitAllPaths() throws IOException {
+    List<Path> rootPaths = determineRootPaths();
+    for (Path rootPath : rootPaths) {
+      String controllerName = rootPath.getName();
+      initFileSystemListings(controllerName);
+    }
+    setupFsMocksForAppsAndChildrenFiles();
+    return this;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcaseBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcaseBuilder.java
new file mode 100644
index 00000000000..f532dddce0f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcaseBuilder.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.logaggregation.TestAggregatedLogDeletionService.ALL_FILE_CONTROLLER_NAMES;
+
+public class LogAggregationTestcaseBuilder {
+  public static final long NO_TIMEOUT = -1;
+  final long now;
+  final Configuration conf;
+  Path remoteRootLogPath;
+  String suffix;
+  String userDirName;
+  long userDirModTime;
+  final Map<Integer, Exception> injectedAppDirDeletionExceptions = new HashMap<>();
+  List<String> fileControllers;
+  long suffixDirModTime;
+  long bucketDirModTime;
+  String suffixDirName;
+  List<AppDescriptor> apps = Lists.newArrayList();
+  int[] finishedAppIds;
+  int[] runningAppIds;
+  PathWithFileStatus userDir;
+  PathWithFileStatus suffixDir;
+  PathWithFileStatus bucketDir;
+  String bucketId;
+  List<Pair<String, Long>> additionalAppDirs = new ArrayList<>();
+  FileSystem rootFs;
+
+  public LogAggregationTestcaseBuilder(Configuration conf) {
+    this.conf = conf;
+    this.now = System.currentTimeMillis();
+  }
+
+  public static LogAggregationTestcaseBuilder create(Configuration conf) {
+    return new LogAggregationTestcaseBuilder(conf);
+  }
+
+  public LogAggregationTestcaseBuilder withRootPath(String root) throws IOException {
+    Path rootPath = new Path(root);
+    rootFs = rootPath.getFileSystem(conf);
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withRemoteRootLogPath(String remoteRootLogDir) {
+    remoteRootLogPath = new Path(remoteRootLogDir);
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withUserDir(String userDirName, long modTime) {
+    this.userDirName = userDirName;
+    this.userDirModTime = modTime;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withSuffixDir(String suffix, long modTime) {
+    this.suffix = suffix;
+    this.suffixDirName = LogAggregationUtils.getBucketSuffix() + suffix;
+    this.suffixDirModTime = modTime;
+    return this;
+  }
+
+  /**
+   * Bucket dir paths will be generated later.
+   * @param modTime The modification time
+   * @return The builder
+   */
+  public LogAggregationTestcaseBuilder withBucketDir(long modTime) {
+    this.bucketDirModTime = modTime;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withBucketDir(long modTime, String bucketId) {
+    this.bucketDirModTime = modTime;
+    this.bucketId = bucketId;
+    return this;
+  }
+
+  public final LogAggregationTestcaseBuilder withApps(List<AppDescriptor> apps) {
+    this.apps = apps;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withFinishedApps(int... apps) {
+    this.finishedAppIds = apps;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withRunningApps(int... apps) {
+    this.runningAppIds = apps;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withBothFileControllers() {
+    this.fileControllers = ALL_FILE_CONTROLLER_NAMES;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder withAdditionalAppDirs(List<Pair<String, Long>> appDirs) {
+    this.additionalAppDirs = appDirs;
+    return this;
+  }
+
+  public LogAggregationTestcaseBuilder injectExceptionForAppDirDeletion(int... indices) {
+    for (int i : indices) {
+      AccessControlException e = new AccessControlException("Injected Error\nStack Trace :(");
+      this.injectedAppDirDeletionExceptions.put(i, e);
+    }
+    return this;
+  }
+
+  public LogAggregationTestcase build() throws IOException {
+    return new LogAggregationTestcase(this);
+  }
+
+  public static final class AppDescriptor {
+    final long modTimeOfAppDir;
+    List<Pair<String, Long>> filesWithModDate = new ArrayList<>();
+    String fileController;
+
+    public AppDescriptor(long modTimeOfAppDir) {
+      this.modTimeOfAppDir = modTimeOfAppDir;
+    }
+
+    public AppDescriptor(long modTimeOfAppDir, List<Pair<String, Long>> filesWithModDate) {
+      this.modTimeOfAppDir = modTimeOfAppDir;
+      this.filesWithModDate = filesWithModDate;
+    }
+
+    public AppDescriptor(String fileController, long modTimeOfAppDir,
+                         List<Pair<String, Long>> filesWithModDate) {
+      this(modTimeOfAppDir, filesWithModDate);
+      this.fileController = fileController;
+    }
+
+
+    public ApplicationId createApplicationId(long now, int id) {
+      return ApplicationId.newInstance(now, id);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
new file mode 100644
index 00000000000..c3f69c2a67d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockRMClientUtils {
+  public static ApplicationClientProtocol createMockRMClient(
+          List<ApplicationId> finishedApplications,
+          List<ApplicationId> runningApplications) throws Exception {
+    final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
+    if (finishedApplications != null && !finishedApplications.isEmpty()) {
+      for (ApplicationId appId : finishedApplications) {
+        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
+        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
+      }
+    }
+    if (runningApplications != null && !runningApplications.isEmpty()) {
+      for (ApplicationId appId : runningApplications) {
+        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
+        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
+      }
+    }
+    return mockProtocol;
+  }
+
+  public static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
+    ApplicationReport report = mock(ApplicationReport.class);
+    when(report.getYarnApplicationState()).thenReturn(
+            YarnApplicationState.RUNNING);
+    GetApplicationReportResponse response =
+            mock(GetApplicationReportResponse.class);
+    when(response.getApplicationReport()).thenReturn(report);
+    return response;
+  }
+
+  public static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
+    ApplicationReport report = mock(ApplicationReport.class);
+    when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+    GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
+    when(response.getApplicationReport()).thenReturn(report);
+    return response;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/PathWithFileStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/PathWithFileStatus.java
new file mode 100644
index 00000000000..5e743f1138e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/PathWithFileStatus.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.testutils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class PathWithFileStatus {
+  public final Path path;
+  public FileStatus fileStatus;
+
+  public PathWithFileStatus(Path path, FileStatus fileStatus) {
+    this.path = path;
+    this.fileStatus = fileStatus;
+  }
+
+  public void changeModificationTime(long modTime) {
+    fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
+            fileStatus.getReplication(),
+            fileStatus.getBlockSize(), modTime, fileStatus.getPath());
+  }
+
+  @Override
+  public String toString() {
+    return "PathWithFileStatus{" +
+            "path=" + path +
+            '}';
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org