You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by rk...@apache.org on 2015/09/26 00:02:46 UTC

hadoop git commit: MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 83e99d06d -> d3c49e766


MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3c49e76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3c49e76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3c49e76

Branch: refs/heads/trunk
Commit: d3c49e76624b7e42a1321c649a1d7bb9906b3073
Parents: 83e99d0
Author: Robert Kanter <rk...@apache.org>
Authored: Fri Sep 25 14:59:49 2015 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Fri Sep 25 15:02:42 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   2 +
 .../dev-support/findbugs-exclude.xml            |  32 +++
 hadoop-tools/hadoop-archive-logs/pom.xml        |  18 ++
 .../apache/hadoop/tools/HadoopArchiveLogs.java  | 243 +++++++++++++++----
 .../hadoop/tools/TestHadoopArchiveLogs.java     | 231 +++++++++++-------
 5 files changed, 392 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3c49e76/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 4bcd5d8..b7e9016 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -580,6 +580,8 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6484. Yarn Client uses local address instead of RM address as
     token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu)
 
+   MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3c49e76/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml
new file mode 100644
index 0000000..7f2064e
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+  <!--
+   Ignore warnings for usage of System.exit. These are appropriate.
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
+    <Method name="handleOpts" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+  <Match>
+    <Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
+    <Method name="run" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3c49e76/hadoop-tools/hadoop-archive-logs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml
index 2a480a8..7e7da77 100644
--- a/hadoop-tools/hadoop-archive-logs/pom.xml
+++ b/hadoop-tools/hadoop-archive-logs/pom.xml
@@ -119,6 +119,12 @@
     </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>test</scope>
@@ -166,6 +172,18 @@
          </archive>
         </configuration>
        </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <findbugsXmlOutput>true</findbugsXmlOutput>
+          <xmlOutput>true</xmlOutput>
+          <excludeFilterFile>
+            ${basedir}/dev-support/findbugs-exclude.xml
+          </excludeFilterFile>
+          <effort>Max</effort>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3c49e76/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
index 4778dcb..0879d41 100644
--- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
@@ -26,12 +26,14 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.output.FileWriterWithEncoding;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
@@ -43,13 +45,15 @@ import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
 import org.apache.hadoop.yarn.applications.distributedshell.Client;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -71,6 +75,7 @@ public class HadoopArchiveLogs implements Tool {
   private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
   private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
   private static final String MEMORY_OPTION = "memory";
+  private static final String VERBOSE_OPTION = "verbose";
 
   private static final int DEFAULT_MAX_ELIGIBLE = -1;
   private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
@@ -85,9 +90,10 @@ public class HadoopArchiveLogs implements Tool {
   long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
   @VisibleForTesting
   long memory = DEFAULT_MEMORY;
+  private boolean verbose = false;
 
   @VisibleForTesting
-  Set<ApplicationReport> eligibleApplications;
+  Set<AppInfo> eligibleApplications;
 
   private JobConf conf;
 
@@ -122,17 +128,20 @@ public class HadoopArchiveLogs implements Tool {
   public int run(String[] args) throws Exception {
     handleOpts(args);
 
-    findAggregatedApps();
-
     FileSystem fs = null;
     Path remoteRootLogDir = new Path(conf.get(
         YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
     String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
     Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+    if (verbose) {
+      LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
+      LOG.info("Log Suffix: " + suffix);
+      LOG.info("Working Dir: " + workingDir);
+    }
     try {
       fs = FileSystem.get(conf);
-      checkFiles(fs, remoteRootLogDir, suffix);
+      checkFilesAndSeedApps(fs, remoteRootLogDir, suffix);
 
       // Prepare working directory
       if (fs.exists(workingDir)) {
@@ -147,6 +156,8 @@ public class HadoopArchiveLogs implements Tool {
       }
     }
 
+    filterAppsByAggregatedStatus();
+
     checkMaxEligible();
 
     if (eligibleApplications.isEmpty()) {
@@ -156,8 +167,8 @@ public class HadoopArchiveLogs implements Tool {
 
     StringBuilder sb =
         new StringBuilder("Will process the following applications:");
-    for (ApplicationReport report : eligibleApplications) {
-      sb.append("\n\t").append(report.getApplicationId());
+    for (AppInfo app : eligibleApplications) {
+      sb.append("\n\t").append(app.getAppId());
     }
     LOG.info(sb.toString());
 
@@ -189,11 +200,14 @@ public class HadoopArchiveLogs implements Tool {
         "The amount of memory (in megabytes) for each container (default: "
             + DEFAULT_MEMORY + ")");
     memoryOpt.setArgName("megabytes");
+    Option verboseOpt = new Option(VERBOSE_OPTION, false,
+        "Print more details.");
     opts.addOption(helpOpt);
     opts.addOption(maxEligibleOpt);
     opts.addOption(minNumLogFilesOpt);
     opts.addOption(maxTotalLogsSizeOpt);
     opts.addOption(memoryOpt);
+    opts.addOption(verboseOpt);
 
     try {
       CommandLineParser parser = new GnuParser();
@@ -225,6 +239,9 @@ public class HadoopArchiveLogs implements Tool {
       if (commandLine.hasOption(MEMORY_OPTION)) {
         memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
       }
+      if (commandLine.hasOption(VERBOSE_OPTION)) {
+        verbose = true;
+      }
     } catch (ParseException pe) {
       HelpFormatter formatter = new HelpFormatter();
       formatter.printHelp("yarn archive-logs", opts);
@@ -233,17 +250,39 @@ public class HadoopArchiveLogs implements Tool {
   }
 
   @VisibleForTesting
-  void findAggregatedApps() throws IOException, YarnException {
+  void filterAppsByAggregatedStatus() throws IOException, YarnException {
     YarnClient client = YarnClient.createYarnClient();
     try {
       client.init(getConf());
       client.start();
-      List<ApplicationReport> reports = client.getApplications();
-      for (ApplicationReport report : reports) {
-        LogAggregationStatus aggStatus = report.getLogAggregationStatus();
-        if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
-            aggStatus.equals(LogAggregationStatus.FAILED)) {
-          eligibleApplications.add(report);
+      for (Iterator<AppInfo> it = eligibleApplications.iterator();
+           it.hasNext();) {
+        AppInfo app = it.next();
+        try {
+          ApplicationReport report = client.getApplicationReport(
+              ConverterUtils.toApplicationId(app.getAppId()));
+          LogAggregationStatus aggStatus = report.getLogAggregationStatus();
+          if (aggStatus.equals(LogAggregationStatus.RUNNING) ||
+              aggStatus.equals(LogAggregationStatus.RUNNING_WITH_FAILURE) ||
+              aggStatus.equals(LogAggregationStatus.NOT_START) ||
+              aggStatus.equals(LogAggregationStatus.DISABLED) ||
+              aggStatus.equals(LogAggregationStatus.FAILED)) {
+            if (verbose) {
+              LOG.info("Skipping " + app.getAppId() +
+                  " due to aggregation status being " + aggStatus);
+            }
+            it.remove();
+          } else {
+            if (verbose) {
+              LOG.info(app.getAppId() + " has aggregation status " + aggStatus);
+            }
+            app.setFinishTime(report.getFinishTime());
+          }
+        } catch (ApplicationNotFoundException e) {
+          // Assume the aggregation has finished
+          if (verbose) {
+            LOG.info(app.getAppId() + " not in the ResourceManager");
+          }
         }
       }
     } finally {
@@ -254,33 +293,71 @@ public class HadoopArchiveLogs implements Tool {
   }
 
   @VisibleForTesting
-  void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
-    for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator();
-         reportIt.hasNext(); ) {
-      ApplicationReport report = reportIt.next();
-      long totalFileSize = 0L;
+  void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir,
+       String suffix) throws IOException {
+    for (RemoteIterator<FileStatus> userIt =
+         fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) {
+      Path userLogPath = userIt.next().getPath();
       try {
-        FileStatus[] files = fs.listStatus(
-            LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
-                report.getApplicationId(), report.getUser(), suffix));
-        if (files.length < minNumLogFiles) {
-          reportIt.remove();
-        } else {
-          for (FileStatus file : files) {
-            if (file.getPath().getName().equals(report.getApplicationId()
-                + ".har")) {
-              reportIt.remove();
-              break;
+        for (RemoteIterator<FileStatus> appIt =
+             fs.listStatusIterator(new Path(userLogPath, suffix));
+             appIt.hasNext();) {
+          Path appLogPath = appIt.next().getPath();
+          try {
+            FileStatus[] files = fs.listStatus(appLogPath);
+            if (files.length >= minNumLogFiles) {
+              boolean eligible = true;
+              long totalFileSize = 0L;
+              for (FileStatus file : files) {
+                if (file.getPath().getName().equals(appLogPath.getName()
+                    + ".har")) {
+                  eligible = false;
+                  if (verbose) {
+                    LOG.info("Skipping " + appLogPath.getName() +
+                        " due to existing .har file");
+                  }
+                  break;
+                }
+                totalFileSize += file.getLen();
+                if (totalFileSize > maxTotalLogsSize) {
+                  eligible = false;
+                  if (verbose) {
+                    LOG.info("Skipping " + appLogPath.getName() + " due to " +
+                        "total file size being too large (" + totalFileSize +
+                        " > " + maxTotalLogsSize + ")");
+                  }
+                  break;
+                }
+              }
+              if (eligible) {
+                if (verbose) {
+                  LOG.info("Adding " + appLogPath.getName() + " for user " +
+                      userLogPath.getName());
+                }
+                eligibleApplications.add(
+                    new AppInfo(appLogPath.getName(), userLogPath.getName()));
+              }
+            } else {
+              if (verbose) {
+                LOG.info("Skipping " + appLogPath.getName() + " due to not " +
+                    "having enough log files (" + files.length + " < " +
+                    minNumLogFiles + ")");
+              }
+            }
+          } catch (IOException ioe) {
+            // Ignore any apps we can't read
+            if (verbose) {
+              LOG.info("Skipping logs under " + appLogPath + " due to " +
+                  ioe.getMessage());
             }
-            totalFileSize += file.getLen();
-          }
-          if (totalFileSize > maxTotalLogsSize) {
-            reportIt.remove();
           }
         }
       } catch (IOException ioe) {
-        // If the user doesn't have permission or it doesn't exist, then skip it
-        reportIt.remove();
+        // Ignore any apps we can't read
+        if (verbose) {
+          LOG.info("Skipping all logs under " + userLogPath + " due to " +
+              ioe.getMessage());
+        }
       }
     }
   }
@@ -289,15 +366,26 @@ public class HadoopArchiveLogs implements Tool {
   void checkMaxEligible() {
     // If we have too many eligible apps, remove the newest ones first
     if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
-      List<ApplicationReport> sortedApplications =
-          new ArrayList<ApplicationReport>(eligibleApplications);
-      Collections.sort(sortedApplications, new Comparator<ApplicationReport>() {
+      if (verbose) {
+        LOG.info("Too many applications (" + eligibleApplications.size() +
+            " > " + maxEligible + ")");
+      }
+      List<AppInfo> sortedApplications =
+          new ArrayList<AppInfo>(eligibleApplications);
+      Collections.sort(sortedApplications, new Comparator<AppInfo>() {
         @Override
-        public int compare(ApplicationReport o1, ApplicationReport o2) {
-          return Long.compare(o1.getFinishTime(), o2.getFinishTime());
+        public int compare(AppInfo o1, AppInfo o2) {
+          int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime());
+          if (lCompare == 0) {
+            return o1.getAppId().compareTo(o2.getAppId());
+          }
+          return lCompare;
         }
       });
       for (int i = maxEligible; i < sortedApplications.size(); i++) {
+        if (verbose) {
+          LOG.info("Removing " + sortedApplications.get(i));
+        }
         eligibleApplications.remove(sortedApplications.get(i));
       }
     }
@@ -325,24 +413,26 @@ public class HadoopArchiveLogs implements Tool {
   @VisibleForTesting
   void generateScript(File localScript, Path workingDir,
         Path remoteRootLogDir, String suffix) throws IOException {
-    LOG.info("Generating script at: " + localScript.getAbsolutePath());
+    if (verbose) {
+      LOG.info("Generating script at: " + localScript.getAbsolutePath());
+    }
     String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
         .getCodeSource().getLocation().getPath();
     String harJarPath = HadoopArchives.class.getProtectionDomain()
         .getCodeSource().getLocation().getPath();
     String classpath = halrJarPath + File.pathSeparator + harJarPath;
-    FileWriter fw = null;
+    FileWriterWithEncoding fw = null;
     try {
-      fw = new FileWriter(localScript);
+      fw = new FileWriterWithEncoding(localScript, "UTF-8");
       fw.write("#!/bin/bash\nset -e\nset -x\n");
       int containerCount = 1;
-      for (ApplicationReport report : eligibleApplications) {
+      for (AppInfo app : eligibleApplications) {
         fw.write("if [ \"$YARN_SHELL_ID\" == \"");
         fw.write(Integer.toString(containerCount));
         fw.write("\" ]; then\n\tappId=\"");
-        fw.write(report.getApplicationId().toString());
+        fw.write(app.getAppId());
         fw.write("\"\n\tuser=\"");
-        fw.write(report.getUser());
+        fw.write(app.getUser());
         fw.write("\"\nel");
         containerCount++;
       }
@@ -382,6 +472,10 @@ public class HadoopArchiveLogs implements Tool {
         "--shell_script",
         localScript.getAbsolutePath()
     };
+    if (verbose) {
+      LOG.info("Running Distributed Shell with arguments: " +
+          Arrays.toString(dsArgs));
+    }
     final Client dsClient = new Client(new Configuration(conf));
     dsClient.init(dsArgs);
     return dsClient.run();
@@ -400,4 +494,59 @@ public class HadoopArchiveLogs implements Tool {
   public Configuration getConf() {
     return this.conf;
   }
+
+  @VisibleForTesting
+  static class AppInfo {
+    private String appId;
+    private String user;
+    private long finishTime;
+
+    AppInfo(String appId, String user) {
+      this.appId = appId;
+      this.user = user;
+      this.finishTime = 0L;
+    }
+
+    public String getAppId() {
+      return appId;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public long getFinishTime() {
+      return finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+      this.finishTime = finishTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      AppInfo appInfo = (AppInfo) o;
+
+      if (appId != null
+          ? !appId.equals(appInfo.appId) : appInfo.appId != null) {
+        return false;
+      }
+      return !(user != null
+          ? !user.equals(appInfo.user) : appInfo.user != null);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = appId != null ? appId.hashCode() : 0;
+      result = 31 * result + (user != null ? user.hashCode() : 0);
+      return result;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3c49e76/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
index c8ff201..7423f79 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -23,15 +23,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -47,6 +44,7 @@ import java.util.Random;
 public class TestHadoopArchiveLogs {
 
   private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
+  private static final String USER = System.getProperty("user.name");
   private static final int FILE_SIZE_INCREMENT = 4096;
   private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
   static {
@@ -54,96 +52,117 @@ public class TestHadoopArchiveLogs {
   }
 
   @Test(timeout = 10000)
-  public void testCheckFiles() throws Exception {
+  public void testCheckFilesAndSeedApps() throws Exception {
     Configuration conf = new Configuration();
     HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
     FileSystem fs = FileSystem.getLocal(conf);
     Path rootLogDir = new Path("target", "logs");
     String suffix = "logs";
-    Path logDir = new Path(rootLogDir,
-        new Path(System.getProperty("user.name"), suffix));
+    Path logDir = new Path(rootLogDir, new Path(USER, suffix));
     fs.mkdirs(logDir);
 
-    Assert.assertEquals(0, hal.eligibleApplications.size());
-    ApplicationReport app1 = createAppReport(1);  // no files found
-    ApplicationReport app2 = createAppReport(2);  // too few files
-    Path app2Path = new Path(logDir, app2.getApplicationId().toString());
+    // no files found
+    ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
+    Path app1Path = new Path(logDir, appId1.toString());
+    fs.mkdirs(app1Path);
+    // too few files
+    ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
+    Path app2Path = new Path(logDir, appId2.toString());
     fs.mkdirs(app2Path);
     createFile(fs, new Path(app2Path, "file1"), 1);
     hal.minNumLogFiles = 2;
-    ApplicationReport app3 = createAppReport(3);  // too large
-    Path app3Path = new Path(logDir, app3.getApplicationId().toString());
+    // too large
+    ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3);
+    Path app3Path = new Path(logDir, appId3.toString());
     fs.mkdirs(app3Path);
     createFile(fs, new Path(app3Path, "file1"), 2);
     createFile(fs, new Path(app3Path, "file2"), 5);
     hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
-    ApplicationReport app4 = createAppReport(4);  // has har already
-    Path app4Path = new Path(logDir, app4.getApplicationId().toString());
+    // has har already
+    ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4);
+    Path app4Path = new Path(logDir, appId4.toString());
     fs.mkdirs(app4Path);
-    createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
-    ApplicationReport app5 = createAppReport(5);  // just right
-    Path app5Path = new Path(logDir, app5.getApplicationId().toString());
+    createFile(fs, new Path(app4Path, appId4 + ".har"), 1);
+    // just right
+    ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5);
+    Path app5Path = new Path(logDir, appId5.toString());
     fs.mkdirs(app5Path);
     createFile(fs, new Path(app5Path, "file1"), 2);
     createFile(fs, new Path(app5Path, "file2"), 3);
-    hal.eligibleApplications.add(app1);
-    hal.eligibleApplications.add(app2);
-    hal.eligibleApplications.add(app3);
-    hal.eligibleApplications.add(app4);
-    hal.eligibleApplications.add(app5);
 
-    hal.checkFiles(fs, rootLogDir, suffix);
+    Assert.assertEquals(0, hal.eligibleApplications.size());
+    hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
     Assert.assertEquals(1, hal.eligibleApplications.size());
-    Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
+    Assert.assertEquals(appId5.toString(),
+        hal.eligibleApplications.iterator().next().getAppId());
   }
 
   @Test(timeout = 10000)
   public void testCheckMaxEligible() throws Exception {
     Configuration conf = new Configuration();
-    HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
-    ApplicationReport app1 = createAppReport(1);
+    HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER);
     app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
-    ApplicationReport app2 = createAppReport(2);
+    HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER);
     app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
-    ApplicationReport app3 = createAppReport(3);
-    app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
-    ApplicationReport app4 = createAppReport(4);
-    app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
-    ApplicationReport app5 = createAppReport(5);
-    app5.setFinishTime(CLUSTER_TIMESTAMP);
+    HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER);
+    // app3 has no finish time set
+    HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER);
+    app4.setFinishTime(CLUSTER_TIMESTAMP + 5);
+    HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER);
+    app5.setFinishTime(CLUSTER_TIMESTAMP + 10);
+    HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER);
+    // app6 has no finish time set
+    HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo(
+        ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER);
+    app7.setFinishTime(CLUSTER_TIMESTAMP);
+    HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
     Assert.assertEquals(0, hal.eligibleApplications.size());
     hal.eligibleApplications.add(app1);
     hal.eligibleApplications.add(app2);
     hal.eligibleApplications.add(app3);
     hal.eligibleApplications.add(app4);
     hal.eligibleApplications.add(app5);
+    hal.eligibleApplications.add(app6);
+    hal.eligibleApplications.add(app7);
+    Assert.assertEquals(7, hal.eligibleApplications.size());
     hal.maxEligible = -1;
     hal.checkMaxEligible();
+    Assert.assertEquals(7, hal.eligibleApplications.size());
+    hal.maxEligible = 6;
+    hal.checkMaxEligible();
+    Assert.assertEquals(6, hal.eligibleApplications.size());
+    Assert.assertFalse(hal.eligibleApplications.contains(app5));
+    hal.maxEligible = 5;
+    hal.checkMaxEligible();
     Assert.assertEquals(5, hal.eligibleApplications.size());
-
+    Assert.assertFalse(hal.eligibleApplications.contains(app4));
     hal.maxEligible = 4;
     hal.checkMaxEligible();
     Assert.assertEquals(4, hal.eligibleApplications.size());
-    Assert.assertFalse(hal.eligibleApplications.contains(app4));
-
+    Assert.assertFalse(hal.eligibleApplications.contains(app7));
     hal.maxEligible = 3;
     hal.checkMaxEligible();
     Assert.assertEquals(3, hal.eligibleApplications.size());
-    Assert.assertFalse(hal.eligibleApplications.contains(app3));
-
+    Assert.assertFalse(hal.eligibleApplications.contains(app1));
     hal.maxEligible = 2;
     hal.checkMaxEligible();
     Assert.assertEquals(2, hal.eligibleApplications.size());
-    Assert.assertFalse(hal.eligibleApplications.contains(app5));
-
+    Assert.assertFalse(hal.eligibleApplications.contains(app2));
     hal.maxEligible = 1;
     hal.checkMaxEligible();
     Assert.assertEquals(1, hal.eligibleApplications.size());
-    Assert.assertFalse(hal.eligibleApplications.contains(app1));
+    Assert.assertFalse(hal.eligibleApplications.contains(app6));
+    Assert.assertTrue(hal.eligibleApplications.contains(app3));
   }
 
   @Test(timeout = 10000)
-  public void testFindAggregatedApps() throws Exception {
+  public void testFilterAppsByAggregatedStatus() throws Exception {
     MiniYARNCluster yarnCluster = null;
     try {
       Configuration conf = new Configuration();
@@ -156,32 +175,66 @@ public class TestHadoopArchiveLogs {
       conf = yarnCluster.getConfig();
 
       RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
-      RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
+      RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext,
           LogAggregationStatus.DISABLED);
-      RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
+      RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext,
           LogAggregationStatus.FAILED);
-      RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
+      RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext,
           LogAggregationStatus.NOT_START);
-      RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
+      RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext,
           LogAggregationStatus.SUCCEEDED);
-      RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
+      RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext,
           LogAggregationStatus.RUNNING);
-      RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
+      RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext,
           LogAggregationStatus.RUNNING_WITH_FAILURE);
-      RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
+      RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext,
           LogAggregationStatus.TIME_OUT);
-      rmContext.getRMApps().put(app1.getApplicationId(), app1);
-      rmContext.getRMApps().put(app2.getApplicationId(), app2);
-      rmContext.getRMApps().put(app3.getApplicationId(), app3);
-      rmContext.getRMApps().put(app4.getApplicationId(), app4);
-      rmContext.getRMApps().put(app5.getApplicationId(), app5);
-      rmContext.getRMApps().put(app6.getApplicationId(), app6);
-      rmContext.getRMApps().put(app7.getApplicationId(), app7);
+      RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext,
+          LogAggregationStatus.SUCCEEDED);
+      rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1);
+      rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2);
+      rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3);
+      rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4);
+      rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5);
+      rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6);
+      rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7);
+      // appImpl8 is not in the RM
 
       HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
       Assert.assertEquals(0, hal.eligibleApplications.size());
-      hal.findAggregatedApps();
-      Assert.assertEquals(2, hal.eligibleApplications.size());
+      hal.eligibleApplications.add(
+          new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(),
+              USER));
+      hal.eligibleApplications.add(
+          new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(),
+              USER));
+      hal.eligibleApplications.add(
+          new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(),
+              USER));
+      HadoopArchiveLogs.AppInfo app4 =
+          new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(),
+              USER);
+      hal.eligibleApplications.add(app4);
+      hal.eligibleApplications.add(
+          new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(),
+              USER));
+      hal.eligibleApplications.add(
+          new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(),
+              USER));
+      HadoopArchiveLogs.AppInfo app7 =
+          new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(),
+              USER);
+      hal.eligibleApplications.add(app7);
+      HadoopArchiveLogs.AppInfo app8 =
+          new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(),
+              USER);
+      hal.eligibleApplications.add(app8);
+      Assert.assertEquals(8, hal.eligibleApplications.size());
+      hal.filterAppsByAggregatedStatus();
+      Assert.assertEquals(3, hal.eligibleApplications.size());
+      Assert.assertTrue(hal.eligibleApplications.contains(app4));
+      Assert.assertTrue(hal.eligibleApplications.contains(app7));
+      Assert.assertTrue(hal.eligibleApplications.contains(app8));
     } finally {
       if (yarnCluster != null) {
         yarnCluster.stop();
@@ -193,10 +246,12 @@ public class TestHadoopArchiveLogs {
   public void testGenerateScript() throws Exception {
     Configuration conf = new Configuration();
     HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
-    ApplicationReport app1 = createAppReport(1);
-    ApplicationReport app2 = createAppReport(2);
-    hal.eligibleApplications.add(app1);
-    hal.eligibleApplications.add(app2);
+    ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
+    ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
+    hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
+        USER));
+    hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
+        USER));
 
     File localScript = new File("target", "script.sh");
     Path workingDir = new Path("/tmp", "working");
@@ -213,22 +268,16 @@ public class TestHadoopArchiveLogs {
     Assert.assertEquals("set -e", lines[1]);
     Assert.assertEquals("set -x", lines[2]);
     Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
-    if (lines[4].contains(app1.getApplicationId().toString())) {
-      Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
-          + "\"", lines[4]);
-      Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
-          + "\"", lines[7]);
+    if (lines[4].contains(app1.toString())) {
+      Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
+      Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
     } else {
-      Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
-          + "\"", lines[4]);
-      Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
-          + "\"", lines[7]);
+      Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
+      Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
     }
-    Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
-        lines[5]);
+    Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
     Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
-    Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
-        lines[8]);
+    Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
     Assert.assertEquals("else", lines[9]);
     Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
     Assert.assertEquals("\texit 1", lines[11]);
@@ -241,15 +290,23 @@ public class TestHadoopArchiveLogs {
         remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
   }
 
-  private static ApplicationReport createAppReport(int id) {
-    ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
-    return ApplicationReport.newInstance(
-        appId,
-        ApplicationAttemptId.newInstance(appId, 1),
-        System.getProperty("user.name"),
-        null, null, null, 0, null, YarnApplicationState.FINISHED, null,
-        null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
-        null, null);
+  /**
+   * If this test failes, then a new Log Aggregation Status was added.  Make
+   * sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test
+   * are updated as well, if necessary.
+   * @throws Exception
+   */
+  @Test(timeout = 5000)
+  public void testStatuses() throws Exception {
+    LogAggregationStatus[] statuses = new LogAggregationStatus[7];
+    statuses[0] = LogAggregationStatus.DISABLED;
+    statuses[1] = LogAggregationStatus.NOT_START;
+    statuses[2] = LogAggregationStatus.RUNNING;
+    statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE;
+    statuses[4] = LogAggregationStatus.SUCCEEDED;
+    statuses[5] = LogAggregationStatus.FAILED;
+    statuses[6] = LogAggregationStatus.TIME_OUT;
+    Assert.assertArrayEquals(statuses, LogAggregationStatus.values());
   }
 
   private static void createFile(FileSystem fs, Path p, long sizeMultiple)
@@ -265,6 +322,7 @@ public class TestHadoopArchiveLogs {
         out.close();
       }
     }
+    Assert.assertTrue(fs.exists(p));
   }
 
   private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
@@ -272,11 +330,10 @@ public class TestHadoopArchiveLogs {
     ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
     ApplicationSubmissionContext submissionContext =
         ApplicationSubmissionContext.newInstance(appId, "test", "default",
-            Priority.newInstance(0), null, false, true,
+            Priority.newInstance(0), null, true, true,
             2, Resource.newInstance(10, 2), "test");
     return new RMAppImpl(appId, rmContext, conf, "test",
-        System.getProperty("user.name"), "default", submissionContext,
-        rmContext.getScheduler(),
+        USER, "default", submissionContext, rmContext.getScheduler(),
         rmContext.getApplicationMasterService(),
         System.currentTimeMillis(), "test",
         null, null) {