You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/05/14 19:38:19 UTC

hadoop git commit: YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 05ff54c66 -> 15ccd967e


YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15ccd967
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15ccd967
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15ccd967

Branch: refs/heads/trunk
Commit: 15ccd967ee3e7046a50522089f67ba01f36ec76a
Parents: 05ff54c
Author: Junping Du <ju...@apache.org>
Authored: Thu May 14 10:57:36 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu May 14 10:58:12 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/api/records/LogAggregationStatus.java  |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../src/main/resources/yarn-default.xml         |   8 +
 .../protocolrecords/LogAggregationReport.java   |  16 +-
 .../protocolrecords/NodeHeartbeatRequest.java   |   7 +-
 .../impl/pb/LogAggregationReportPBImpl.java     |  40 ----
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  82 +++----
 .../hadoop/yarn/server/webapp/AppBlock.java     |  19 +-
 .../yarn_server_common_service_protos.proto     |  14 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  46 +---
 .../logaggregation/AppLogAggregatorImpl.java    |  19 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java | 228 +++++++++++++++----
 .../resourcemanager/rmnode/RMNodeImpl.java      |  13 +-
 .../rmnode/RMNodeStatusEvent.java               |  11 +-
 .../resourcemanager/webapp/RMAppBlock.java      |  11 +-
 .../webapp/RMAppLogAggregationStatusBlock.java  |  37 ++-
 .../TestRMAppLogAggregationStatus.java          | 181 +++++++++++----
 19 files changed, 469 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0346c54..e0f2c52 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -111,6 +111,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
     (Jonathan Eagles via zjshen)
 
+    YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in 
+    RMApps. (Xuan Gong via junping_du)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
index da1230c..1e10972 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
@@ -34,6 +34,8 @@ public enum LogAggregationStatus {
   /** Log Aggregation is Running. */
   RUNNING,
 
+  /** Log Aggregation is Running, but has failures in previous cycles. */
+  RUNNING_WITH_FAILURE,
   /**
    * Log Aggregation is Succeeded. All of the logs have been aggregated
    * successfully.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 94f3e60..52fff14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -718,6 +718,16 @@ public class YarnConfiguration extends Configuration {
       + "proxy-user-privileges.enabled";
   public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
 
+  /**
+   * How many diagnostics/failure messages can be saved in RM for
+   * log aggregation. It also defines the number of diagnostics/failure
+   * messages can be shown in log aggregation web ui.
+   */
+  public static final String RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+      RM_PREFIX + "max-log-aggregation-diagnostics-in-memory";
+  public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+      10;
+
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index c45081a..4095676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -204,6 +204,7 @@ enum LogAggregationStatusProto {
   LOG_SUCCEEDED = 4;
   LOG_FAILED = 5;
   LOG_TIME_OUT = 6;
+  LOG_RUNNING_WITH_FAILURE = 7;
 }
 
 message ApplicationAttemptReportProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4d74f76..1dd88bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -674,6 +674,14 @@
     <value>10</value>
   </property>
 
+  <property>
+    <description>Number of diagnostics/failure messages can be saved in RM for
+    log aggregation. It also defines the number of diagnostics/failure
+    messages can be shown in log aggregation web ui.</description>
+    <name>yarn.resourcemanager.max-log-aggregation-diagnostics-in-memory</name>
+    <value>10</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>The hostname of the NM.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
index b2270d8..d76f4cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.util.Records;
  * It includes details such as:
  * <ul>
  *   <li>{@link ApplicationId} of the application.</li>
- *   <li>{@link NodeId} of the NodeManager.</li>
  *   <li>{@link LogAggregationStatus}</li>
  *   <li>Diagnostic information</li>
  * </ul>
@@ -45,7 +43,7 @@ public abstract class LogAggregationReport {
   @Public
   @Unstable
   public static LogAggregationReport newInstance(ApplicationId appId,
-      NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+      LogAggregationStatus status, String diagnosticMessage) {
     LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
     report.setApplicationId(appId);
     report.setLogAggregationStatus(status);
@@ -66,18 +64,6 @@ public abstract class LogAggregationReport {
   public abstract void setApplicationId(ApplicationId appId);
 
   /**
-   * Get the <code>NodeId</code>.
-   * @return <code>NodeId</code>
-   */
-  @Public
-  @Unstable
-  public abstract NodeId getNodeId();
-
-  @Public
-  @Unstable
-  public abstract void setNodeId(NodeId nodeId);
-
-  /**
    * Get the <code>LogAggregationStatus</code>.
    * @return <code>LogAggregationStatus</code>
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index 227363f..767e4b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -54,9 +53,9 @@ public abstract class NodeHeartbeatRequest {
   public abstract Set<String> getNodeLabels();
   public abstract void setNodeLabels(Set<String> nodeLabels);
 
-  public abstract Map<ApplicationId, LogAggregationReport>
+  public abstract List<LogAggregationReport>
       getLogAggregationReportsForApps();
 
   public abstract void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
+      List<LogAggregationReport> logAggregationReportsForApps);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
index 75b6eab..ac6ad2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
@@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -45,7 +42,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
   boolean viaProto = false;
 
   private ApplicationId applicationId;
-  private NodeId nodeId;
 
   public LogAggregationReportPBImpl() {
     builder = LogAggregationReportProto.newBuilder();
@@ -89,12 +85,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
           builder.getApplicationId())) {
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
     }
-
-    if (this.nodeId != null
-        && !((NodeIdPBImpl) this.nodeId).getProto().equals(
-          builder.getNodeId())) {
-      builder.setNodeId(convertToProtoFormat(this.nodeId));
-    }
   }
 
   private void mergeLocalToProto() {
@@ -191,34 +181,4 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
     }
     builder.setDiagnostics(diagnosticMessage);
   }
-
-  @Override
-  public NodeId getNodeId() {
-    if (this.nodeId != null) {
-      return this.nodeId;
-    }
-
-    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasNodeId()) {
-      return null;
-    }
-    this.nodeId = convertFromProtoFormat(p.getNodeId());
-    return this.nodeId;
-  }
-
-  @Override
-  public void setNodeId(NodeId nodeId) {
-    maybeInitBuilder();
-    if (nodeId == null)
-      builder.clearNodeId();
-    this.nodeId = nodeId;
-  }
-
-  private NodeIdProto convertToProtoFormat(NodeId t) {
-    return ((NodeIdPBImpl) t).getProto();
-  }
-
-  private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
-    return new NodeIdPBImpl(nodeId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 03db39c..81f173d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,21 +18,16 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -51,9 +46,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
   private Set<String> labels = null;
-  private Map<ApplicationId, LogAggregationReport>
-      logAggregationReportsForApps = null;
-  
+  private List<LogAggregationReport> logAggregationReportsForApps = null;
+
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
   }
@@ -110,12 +104,35 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private void addLogAggregationStatusForAppsToProto() {
     maybeInitBuilder();
     builder.clearLogAggregationReportsForApps();
-    for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
-      .entrySet()) {
-      builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
-        .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
-        .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+    if (this.logAggregationReportsForApps == null) {
+      return;
     }
+    Iterable<LogAggregationReportProto> it =
+        new Iterable<LogAggregationReportProto>() {
+          @Override
+          public Iterator<LogAggregationReportProto> iterator() {
+            return new Iterator<LogAggregationReportProto>() {
+              private Iterator<LogAggregationReport> iter =
+                  logAggregationReportsForApps.iterator();
+
+              @Override
+              public boolean hasNext() {
+                return iter.hasNext();
+              }
+
+              @Override
+              public LogAggregationReportProto next() {
+                return convertToProtoFormat(iter.next());
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        };
+    builder.addAllLogAggregationReportsForApps(it);
   }
 
   private LogAggregationReportProto convertToProtoFormat(
@@ -246,17 +263,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     labels = new HashSet<String>(nodeLabels.getElementsList());
   }
 
-  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
-    return new ApplicationIdPBImpl(p);
-  }
-
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
-    return ((ApplicationIdPBImpl) t).getProto();
-  }
-
   @Override
-  public Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps() {
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
     if (this.logAggregationReportsForApps != null) {
       return this.logAggregationReportsForApps;
     }
@@ -266,15 +274,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
 
   private void initLogAggregationReportsForApps() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<LogAggregationReportsForAppsProto> list =
+    List<LogAggregationReportProto> list =
         p.getLogAggregationReportsForAppsList();
-    this.logAggregationReportsForApps =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    for (LogAggregationReportsForAppsProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      LogAggregationReport report =
-          convertFromProtoFormat(c.getLogAggregationReport());
-      this.logAggregationReportsForApps.put(appId, report);
+    this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
+    for (LogAggregationReportProto c : list) {
+      this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
     }
   }
 
@@ -285,14 +289,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
 
   @Override
   public void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
-    if (logAggregationStatusForApps == null
-        || logAggregationStatusForApps.isEmpty()) {
-      return;
+      List<LogAggregationReport> logAggregationStatusForApps) {
+    if(logAggregationStatusForApps == null) {
+      builder.clearLogAggregationReportsForApps();
     }
-    maybeInitBuilder();
-    this.logAggregationReportsForApps =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+    this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
index dd5a4c8..f46197e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
@@ -192,8 +193,17 @@ public class AppBlock extends HtmlBlock {
             : "ApplicationMaster");
     if (webUiType != null
         && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
-      overviewTable._("Log Aggregation Status",
-        root_url("logaggregationstatus", app.getAppId()), "Status");
+      LogAggregationStatus status = getLogAggregationStatus();
+      if (status == null) {
+        overviewTable._("Log Aggregation Status", "N/A");
+      } else if (status == LogAggregationStatus.DISABLED
+          || status == LogAggregationStatus.NOT_START
+          || status == LogAggregationStatus.SUCCEEDED) {
+        overviewTable._("Log Aggregation Status", status.name());
+      } else {
+        overviewTable._("Log Aggregation Status",
+            root_url("logaggregationstatus", app.getAppId()), status.name());
+      }
     }
     overviewTable._("Diagnostics:",
         app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
@@ -342,4 +352,9 @@ public class AppBlock extends HtmlBlock {
   protected void createApplicationMetricsTable(Block html) {
 
   }
+
+  // This will be overrided in RMAppBlock
+  protected LogAggregationStatus getLogAggregationStatus() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d34c9f7..c027ac0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -50,19 +50,13 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional StringArrayProto nodeLabels = 4;
-  repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
-}
-
-message LogAggregationReportsForAppsProto {
-  optional ApplicationIdProto appId = 1;
-  optional LogAggregationReportProto log_aggregation_report = 2;
+  repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
 }
 
 message LogAggregationReportProto {
-optional ApplicationIdProto application_id = 1;
-optional NodeIdProto node_id = 2;
-optional LogAggregationStatusProto log_aggregation_status = 3;
-optional string diagnostics = 4 [default = "N/A"];
+  optional ApplicationIdProto application_id = 1;
+  optional LogAggregationStatusProto log_aggregation_status = 2;
+  optional string diagnostics = 3 [default = "N/A"];
 }
 
 message NodeHeartbeatResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 0eb7ff4..8046228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -666,7 +665,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
             if (logAggregationEnabled) {
               // pull log aggregation status for application running in this NM
-              Map<ApplicationId, LogAggregationReport> logAggregationReports =
+              List<LogAggregationReport> logAggregationReports =
                   getLogAggregationReportsForApps(context
                     .getLogAggregationStatusForApps());
               if (logAggregationReports != null
@@ -810,47 +809,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     statusUpdater.start();
   }
 
-  private Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps(
-          ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
-    Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
-        new HashMap<ApplicationId, LogAggregationReport>();
+  private List<LogAggregationReport> getLogAggregationReportsForApps(
+      ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
     LogAggregationReport status;
     while ((status = lastestLogAggregationStatus.poll()) != null) {
       this.logAggregationReportForAppsTempList.add(status);
     }
-    for (LogAggregationReport logAggregationReport
-        : this.logAggregationReportForAppsTempList) {
-      LogAggregationReport report = null;
-      if (latestLogAggregationReports.containsKey(logAggregationReport
-        .getApplicationId())) {
-        report =
-            latestLogAggregationReports.get(logAggregationReport
-              .getApplicationId());
-        report.setLogAggregationStatus(logAggregationReport
-          .getLogAggregationStatus());
-        String message = report.getDiagnosticMessage();
-        if (logAggregationReport.getDiagnosticMessage() != null
-            && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
-          if (message != null) {
-            message += logAggregationReport.getDiagnosticMessage();
-          } else {
-            message = logAggregationReport.getDiagnosticMessage();
-          }
-          report.setDiagnosticMessage(message);
-        }
-      } else {
-        report = Records.newRecord(LogAggregationReport.class);
-        report.setApplicationId(logAggregationReport.getApplicationId());
-        report.setNodeId(this.nodeId);
-        report.setLogAggregationStatus(logAggregationReport
-          .getLogAggregationStatus());
-        report
-          .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
-      }
-      latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
-        report);
-    }
-    return latestLogAggregationReports;
+    List<LogAggregationReport> reports = new ArrayList<LogAggregationReport>();
+    reports.addAll(logAggregationReportForAppsTempList);
+    return reports;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 3111f10..dd2ab25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -306,6 +306,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
                     + currentTime);
 
       String diagnosticMessage = "";
+      boolean logAggregationSucceedInThisCycle = true;
       final boolean rename = uploadedLogsInThisCycle;
       try {
         userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -338,20 +339,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
                 + LogAggregationUtils.getNodeString(nodeId) + " at "
                 + Times.format(currentTime) + "\n";
         renameTemporaryLogFileFailed = true;
+        logAggregationSucceedInThisCycle = false;
       }
 
       LogAggregationReport report =
           Records.newRecord(LogAggregationReport.class);
       report.setApplicationId(appId);
-      report.setNodeId(nodeId);
       report.setDiagnosticMessage(diagnosticMessage);
+      report.setLogAggregationStatus(logAggregationSucceedInThisCycle
+          ? LogAggregationStatus.RUNNING
+          : LogAggregationStatus.RUNNING_WITH_FAILURE);
+      this.context.getLogAggregationStatusForApps().add(report);
       if (appFinished) {
-        report.setLogAggregationStatus(renameTemporaryLogFileFailed
+        // If the app is finished, one extra final report with log aggregation
+        // status SUCCEEDED/FAILED will be sent to RM to inform the RM
+        // that the log aggregation in this NM is completed.
+        LogAggregationReport finalReport =
+            Records.newRecord(LogAggregationReport.class);
+        finalReport.setApplicationId(appId);
+        finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
             ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
-      } else {
-        report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
+        this.context.getLogAggregationStatusForApps().add(report);
       }
-      this.context.getLogAggregationStatusForApps().add(report);
     } finally {
       if (writer != null) {
         writer.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 8abc478..f3dacd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -22,12 +22,15 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -152,6 +156,13 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Map<NodeId, LogAggregationReport> logAggregationStatus =
       new HashMap<NodeId, LogAggregationReport>();
   private LogAggregationStatus logAggregationStatusForAppReport;
+  private int logAggregationSucceed = 0;
+  private int logAggregationFailed = 0;
+  private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
+      new HashMap<NodeId, List<String>>();
+  private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
+      new HashMap<NodeId, List<String>>();
+  private final int maxLogAggregationDiagnosticsInMemory;
 
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
@@ -437,6 +448,14 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.logAggregationEnabled =
         conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
           YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    if (this.logAggregationEnabled) {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
+    } else {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
+    }
+    maxLogAggregationDiagnosticsInMemory = conf.getInt(
+        YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+        YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
   }
 
   @Override
@@ -834,10 +853,9 @@ public class RMAppImpl implements RMApp, Recoverable {
 
       if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
         app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
-          LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
-            .getNodeId(), app.logAggregationEnabled
-              ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
-            ""));
+          LogAggregationReport.newInstance(app.applicationId,
+            app.logAggregationEnabled ? LogAggregationStatus.NOT_START
+                : LogAggregationStatus.DISABLED, ""));
       }
     };
   }
@@ -1401,18 +1419,20 @@ public class RMAppImpl implements RMApp, Recoverable {
       Map<NodeId, LogAggregationReport> outputs =
           new HashMap<NodeId, LogAggregationReport>();
       outputs.putAll(logAggregationStatus);
-      for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
-        if (!output.getValue().getLogAggregationStatus()
-          .equals(LogAggregationStatus.TIME_OUT)
-            && !output.getValue().getLogAggregationStatus()
-              .equals(LogAggregationStatus.SUCCEEDED)
-            && !output.getValue().getLogAggregationStatus()
-              .equals(LogAggregationStatus.FAILED)
-            && isAppInFinalState(this)
-            && System.currentTimeMillis() > this.logAggregationStartTime
-                + this.logAggregationStatusTimeout) {
-          output.getValue().setLogAggregationStatus(
-            LogAggregationStatus.TIME_OUT);
+      if (!isLogAggregationFinished()) {
+        for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+          if (!output.getValue().getLogAggregationStatus()
+            .equals(LogAggregationStatus.TIME_OUT)
+              && !output.getValue().getLogAggregationStatus()
+                .equals(LogAggregationStatus.SUCCEEDED)
+              && !output.getValue().getLogAggregationStatus()
+                .equals(LogAggregationStatus.FAILED)
+              && isAppInFinalState(this)
+              && System.currentTimeMillis() > this.logAggregationStartTime
+                  + this.logAggregationStatusTimeout) {
+            output.getValue().setLogAggregationStatus(
+              LogAggregationStatus.TIME_OUT);
+          }
         }
       }
       return outputs;
@@ -1424,32 +1444,46 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
     try {
       this.writeLock.lock();
-      if (this.logAggregationEnabled) {
+      if (this.logAggregationEnabled && !isLogAggregationFinished()) {
         LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+        boolean stateChangedToFinal = false;
         if (curReport == null) {
           this.logAggregationStatus.put(nodeId, report);
+          if (isLogAggregationFinishedForNM(report)) {
+            stateChangedToFinal = true;
+          }
         } else {
-          if (curReport.getLogAggregationStatus().equals(
-            LogAggregationStatus.TIME_OUT)) {
-            if (report.getLogAggregationStatus().equals(
-              LogAggregationStatus.SUCCEEDED)
-                || report.getLogAggregationStatus().equals(
-                  LogAggregationStatus.FAILED)) {
-              curReport.setLogAggregationStatus(report
-                .getLogAggregationStatus());
+          if (isLogAggregationFinishedForNM(report)) {
+            if (!isLogAggregationFinishedForNM(curReport)) {
+              stateChangedToFinal = true;
             }
-          } else {
-            curReport.setLogAggregationStatus(report.getLogAggregationStatus());
           }
-
-          if (report.getDiagnosticMessage() != null
-              && !report.getDiagnosticMessage().isEmpty()) {
-            curReport
-              .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
-                  ? report.getDiagnosticMessage() : curReport
-                    .getDiagnosticMessage() + report.getDiagnosticMessage());
+          if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
+              || curReport.getLogAggregationStatus() !=
+                  LogAggregationStatus.RUNNING_WITH_FAILURE) {
+            if (curReport.getLogAggregationStatus()
+                == LogAggregationStatus.TIME_OUT
+                && report.getLogAggregationStatus()
+                    == LogAggregationStatus.RUNNING) {
+            // If the log aggregation status got from latest nm heartbeat
+            // is Running, and current log aggregation status is TimeOut,
+            // based on whether there are any failure messages for this NM,
+            // we will reset the log aggregation status as RUNNING or
+            // RUNNING_WITH_FAILURE
+              if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
+                  !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
+                report.setLogAggregationStatus(
+                    LogAggregationStatus.RUNNING_WITH_FAILURE);
+              }
+            }
+            curReport.setLogAggregationStatus(report
+              .getLogAggregationStatus());
           }
         }
+        updateLogAggregationDiagnosticMessages(nodeId, report);
+        if (isAppInFinalState(this) && stateChangedToFinal) {
+          updateLogAggregationStatus(nodeId);
+        }
       }
     } finally {
       this.writeLock.unlock();
@@ -1458,29 +1492,32 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   @Override
   public LogAggregationStatus getLogAggregationStatusForAppReport() {
-    if (!logAggregationEnabled) {
-      return LogAggregationStatus.DISABLED;
-    }
-    if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
-        || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
-      return this.logAggregationStatusForAppReport;
-    }
     try {
       this.readLock.lock();
+      if (! logAggregationEnabled) {
+        return LogAggregationStatus.DISABLED;
+      }
+      if (isLogAggregationFinished()) {
+        return this.logAggregationStatusForAppReport;
+      }
       Map<NodeId, LogAggregationReport> reports =
           getLogAggregationReportsForApp();
       if (reports.size() == 0) {
-        return null;
+        return this.logAggregationStatusForAppReport;
       }
       int logNotStartCount = 0;
       int logCompletedCount = 0;
       int logTimeOutCount = 0;
       int logFailedCount = 0;
+      int logRunningWithFailure = 0;
       for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
         switch (report.getValue().getLogAggregationStatus()) {
           case NOT_START:
             logNotStartCount++;
             break;
+          case RUNNING_WITH_FAILURE:
+            logRunningWithFailure ++;
+            break;
           case SUCCEEDED:
             logCompletedCount++;
             break;
@@ -1506,19 +1543,122 @@ public class RMAppImpl implements RMApp, Recoverable {
         // the log aggregation is finished. And the log aggregation status will
         // not be updated anymore.
         if (logFailedCount > 0 && isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
           return LogAggregationStatus.FAILED;
         } else if (logTimeOutCount > 0) {
           return LogAggregationStatus.TIME_OUT;
         }
         if (isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
           return LogAggregationStatus.SUCCEEDED;
         }
+      } else if (logRunningWithFailure > 0) {
+        return LogAggregationStatus.RUNNING_WITH_FAILURE;
       }
       return LogAggregationStatus.RUNNING;
     } finally {
       this.readLock.unlock();
     }
   }
+
+  private boolean isLogAggregationFinished() {
+    return this.logAggregationStatusForAppReport
+      .equals(LogAggregationStatus.SUCCEEDED)
+        || this.logAggregationStatusForAppReport
+          .equals(LogAggregationStatus.FAILED);
+
+  }
+
+  private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
+    return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
+        || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
+  }
+
+  private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
+      LogAggregationReport report) {
+    if (report.getDiagnosticMessage() != null
+        && !report.getDiagnosticMessage().isEmpty()) {
+      if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING ) {
+        List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
+        if (diagnostics == null) {
+          diagnostics = new ArrayList<String>();
+          logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
+        } else {
+          if (diagnostics.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            diagnostics.remove(0);
+          }
+        }
+        diagnostics.add(report.getDiagnosticMessage());
+        this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
+          StringUtils.join(diagnostics, "\n"));
+      } else if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+        List<String> failureMessages =
+            logAggregationFailureMessagesForNMs.get(nodeId);
+        if (failureMessages == null) {
+          failureMessages = new ArrayList<String>();
+          logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
+        } else {
+          if (failureMessages.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            failureMessages.remove(0);
+          }
+        }
+        failureMessages.add(report.getDiagnosticMessage());
+      }
+    }
+  }
+
+  private void updateLogAggregationStatus(NodeId nodeId) {
+    LogAggregationStatus status =
+        this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+    if (status.equals(LogAggregationStatus.SUCCEEDED)) {
+      this.logAggregationSucceed++;
+    } else if (status.equals(LogAggregationStatus.FAILED)) {
+      this.logAggregationFailed++;
+    }
+    if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport =
+          LogAggregationStatus.SUCCEEDED;
+      // Since the log aggregation status for this application for all NMs
+      // is SUCCEEDED, it means all logs are aggregated successfully.
+      // We could remove all the cached log aggregation reports
+      this.logAggregationStatus.clear();
+      this.logAggregationDiagnosticsForNMs.clear();
+      this.logAggregationFailureMessagesForNMs.clear();
+    } else if (this.logAggregationSucceed + this.logAggregationFailed
+        == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
+      // We have collected the log aggregation status for all NMs.
+      // The log aggregation status is FAILED which means the log
+      // aggregation fails in some NMs. We are only interested in the
+      // nodes where the log aggregation is failed. So we could remove
+      // the log aggregation details for those succeeded NMs
+      for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
+          this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<NodeId, LogAggregationReport> entry = it.next();
+        if (entry.getValue().getLogAggregationStatus()
+          .equals(LogAggregationStatus.SUCCEEDED)) {
+          it.remove();
+        }
+      }
+      // the log aggregation has finished/failed.
+      // and the status will not be updated anymore.
+      this.logAggregationDiagnosticsForNMs.clear();
+    }
+  }
+
+  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+    try {
+      this.readLock.lock();
+      List<String> failureMessages =
+          this.logAggregationFailureMessagesForNMs.get(nodeId);
+      if (failureMessages == null || failureMessages.isEmpty()) {
+        return StringUtils.EMPTY;
+      }
+      return StringUtils.join(failureMessages, "\n");
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3be1867..a11aacf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -777,7 +775,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
 
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+      List<LogAggregationReport> logAggregationReportsForApps =
           statusEvent.getLogAggregationReportsForApps();
       if (logAggregationReportsForApps != null
           && !logAggregationReportsForApps.isEmpty()) {
@@ -915,12 +913,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   private void handleLogAggregationStatus(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
-    for (Entry<ApplicationId, LogAggregationReport> report :
-        logAggregationReportsForApps.entrySet()) {
-      RMApp rmApp = this.context.getRMApps().get(report.getKey());
+      List<LogAggregationReport> logAggregationReportsForApps) {
+    for (LogAggregationReport report : logAggregationReportsForApps) {
+      RMApp rmApp = this.context.getRMApps().get(report.getApplicationId());
       if (rmApp != null) {
-        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index 4bbf610..b95d7d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -34,7 +32,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   private final List<ContainerStatus> containersCollection;
   private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
-  private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
+  private List<LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -50,7 +48,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
       NodeHeartbeatResponse latestResponse,
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+      List<LogAggregationReport> logAggregationReportsForApps) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeHealthStatus = nodeHealthStatus;
     this.containersCollection = collection;
@@ -75,13 +73,12 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.keepAliveAppIds;
   }
 
-  public Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps() {
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
     return this.logAggregationReportsForApps;
   }
 
   public void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+      List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
index 43e26be..38e0e3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.AppBlock;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -170,4 +170,13 @@ public class RMAppBlock extends AppBlock{
 
     tbody._()._();
   }
+
+  @Override
+  protected LogAggregationStatus getLogAggregationStatus() {
+    RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID);
+    if (rmApp == null) {
+      return null;
+    }
+    return rmApp.getLogAggregationStatusForAppReport();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
index a2f61e3..f7f7c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -93,6 +94,9 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
       .td("Log Aggregation does not Start.")._();
     table_description.tr().td(LogAggregationStatus.RUNNING.name())
       .td("Log Aggregation is Running.")._();
+    table_description.tr().td(LogAggregationStatus.RUNNING_WITH_FAILURE.name())
+      .td("Log Aggregation is Running, but has failures "
+          + "in previous cycles")._();
     table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
       .td("Log Aggregation is Succeeded. All of the logs have been "
           + "aggregated successfully.")._();
@@ -106,24 +110,29 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
     table_description._();
     div_description._();
 
-    boolean logAggregationEnabled =
-        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
     // Application Log aggregation status Table
     DIV<Hamlet> div = html.div(_INFO_WRAP);
     TABLE<DIV<Hamlet>> table =
         div.h3(
           "Log Aggregation: "
-              + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+              + (rmApp == null ? "N/A" : rmApp
+                .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
+                .getLogAggregationStatusForAppReport().name())).table(
           "#LogAggregationStatus");
-    table.
-      tr().
-        th(_TH, "NodeId").
-        th(_TH, "Log Aggregation Status").
-        th(_TH, "Diagnostis Message").
-      _();
 
-    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+    int maxLogAggregationDiagnosticsInMemory = conf.getInt(
+      YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+      YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+    table
+      .tr()
+      .th(_TH, "NodeId")
+      .th(_TH, "Log Aggregation Status")
+      .th(_TH, "Last "
+          + maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages")
+      .th(_TH, "Last "
+          + maxLogAggregationDiagnosticsInMemory + " Failure Messages")._();
+
     if (rmApp != null) {
       Map<NodeId, LogAggregationReport> logAggregationReports =
           rmApp.getLogAggregationReportsForApp();
@@ -136,10 +145,14 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
           String message =
               report.getValue() == null ? null : report.getValue()
                 .getDiagnosticMessage();
+          String failureMessage =
+              report.getValue() == null ? null : ((RMAppImpl)rmApp)
+                  .getLogAggregationFailureMessagesForNM(report.getKey());
           table.tr()
             .td(report.getKey().toString())
             .td(status == null ? "N/A" : status.toString())
-            .td(message == null ? "N/A" : message)._();
+            .td(message == null ? "N/A" : message)
+            .td(failureMessage == null ? "N/A" : failureMessage)._();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15ccd967/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 4eec63f..9af4290 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -23,7 +23,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -155,26 +155,26 @@ public class TestRMAppLogAggregationStatus {
         .getLogAggregationStatus());
     }
 
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node1ReportForApp =
+        new ArrayList<LogAggregationReport>();
     String messageForNode1_1 =
         "node1 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report1 =
-        LogAggregationReport.newInstance(appId, nodeId1,
-          LogAggregationStatus.RUNNING, messageForNode1_1);
-    node1ReportForApp.put(appId, report1);
+        LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
+          messageForNode1_1);
+    node1ReportForApp.add(report1);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp));
 
-    Map<ApplicationId, LogAggregationReport> node2ReportForApp =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node2ReportForApp =
+        new ArrayList<LogAggregationReport>();
     String messageForNode2_1 =
         "node2 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report2 =
-        LogAggregationReport.newInstance(appId, nodeId2,
+        LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode2_1);
-    node2ReportForApp.put(appId, report2);
+    node2ReportForApp.add(report2);
     node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node2ReportForApp));
@@ -205,14 +205,14 @@ public class TestRMAppLogAggregationStatus {
     }
 
     // node1 updates its log aggregation status again
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node1ReportForApp2 =
+        new ArrayList<LogAggregationReport>();
     String messageForNode1_2 =
         "node1 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report1_2 =
-        LogAggregationReport.newInstance(appId, nodeId1,
+        LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode1_2);
-    node1ReportForApp2.put(appId, report1_2);
+    node1ReportForApp2.add(report1_2);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp2));
@@ -230,8 +230,9 @@ public class TestRMAppLogAggregationStatus {
       if (report.getKey().equals(node1.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
           .getLogAggregationStatus());
-        Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
-          .getValue().getDiagnosticMessage());
+        Assert.assertEquals(
+          messageForNode1_1 + "\n" + messageForNode1_2, report
+            .getValue().getDiagnosticMessage());
       } else if (report.getKey().equals(node2.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
           .getLogAggregationStatus());
@@ -268,15 +269,19 @@ public class TestRMAppLogAggregationStatus {
     // Finally, node1 finished its log aggregation and sent out its final
     // log aggregation status. The log aggregation status for node1 should
     // be changed from TIME_OUT to SUCCEEDED
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    String messageForNode1_3 =
-        "node1 final logAggregation status updated at "
-            + System.currentTimeMillis();
-    LogAggregationReport report1_3 =
-        LogAggregationReport.newInstance(appId, nodeId1,
-          LogAggregationStatus.SUCCEEDED, messageForNode1_3);
-    node1ReportForApp3.put(appId, report1_3);
+    List<LogAggregationReport> node1ReportForApp3 =
+        new ArrayList<LogAggregationReport>();
+    LogAggregationReport report1_3;
+    for (int i = 0; i < 10 ; i ++) {
+      report1_3 =
+          LogAggregationReport.newInstance(appId,
+            LogAggregationStatus.RUNNING, "test_message_" + i);
+      node1ReportForApp3.add(report1_3);
+    }
+    node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
+      LogAggregationStatus.SUCCEEDED, ""));
+    // For every logAggregationReport cached in memory, we can only save at most
+    // 10 diagnostic messages/failure messages
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp3));
@@ -290,8 +295,14 @@ public class TestRMAppLogAggregationStatus {
       if (report.getKey().equals(node1.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
           .getLogAggregationStatus());
-        Assert.assertEquals(messageForNode1_1 + messageForNode1_2
-            + messageForNode1_3, report.getValue().getDiagnosticMessage());
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < 9; i ++) {
+          builder.append("test_message_" + i);
+          builder.append("\n");
+        }
+        builder.append("test_message_" + 9);
+        Assert.assertEquals(builder.toString(), report.getValue()
+          .getDiagnosticMessage());
       } else if (report.getKey().equals(node2.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
           .getLogAggregationStatus());
@@ -301,6 +312,32 @@ public class TestRMAppLogAggregationStatus {
           .fail("should not contain log aggregation report for other nodes");
       }
     }
+
+    // update log aggregationStatus for node2 as FAILED,
+    // so the log aggregation status for the App will become FAILED,
+    // and we only keep the log aggregation reports whose status is FAILED,
+    // so the log aggregation report for node1 will be removed.
+    List<LogAggregationReport> node2ReportForApp2 =
+        new ArrayList<LogAggregationReport>();
+    LogAggregationReport report2_2 =
+        LogAggregationReport.newInstance(appId,
+          LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
+    LogAggregationReport report2_3 =
+        LogAggregationReport.newInstance(appId,
+          LogAggregationStatus.FAILED, "");
+    node2ReportForApp2.add(report2_2);
+    node2ReportForApp2.add(report2_3);
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node2ReportForApp2));
+    Assert.assertEquals(LogAggregationStatus.FAILED,
+      rmApp.getLogAggregationStatusForAppReport());
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertTrue(logAggregationStatus.size() == 1);
+    Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
+    Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
+    Assert.assertEquals("Fail_Message",
+      ((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
   }
 
   @Test (timeout = 10000)
@@ -317,9 +354,11 @@ public class TestRMAppLogAggregationStatus {
     // Enable the log aggregation
     conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     rmApp = (RMAppImpl)createRMApp(conf);
-    // If we do not know any NodeManagers for this application ,
-    // the log aggregation status will return null
-    Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
+    // If we do not know any NodeManagers for this application , and
+    // the log aggregation is enabled, the log aggregation status will
+    // return NOT_START
+    Assert.assertEquals(LogAggregationStatus.NOT_START,
+      rmApp.getLogAggregationStatusForAppReport());
 
     NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
     NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@@ -329,24 +368,24 @@ public class TestRMAppLogAggregationStatus {
     // If the log aggregation status for all NMs are NOT_START,
     // the log aggregation status for this app will return NOT_START
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     Assert.assertEquals(LogAggregationStatus.NOT_START,
       rmApp.getLogAggregationStatusForAppReport());
 
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.RUNNING,
       rmApp.getLogAggregationStatusForAppReport());
 
@@ -357,13 +396,13 @@ public class TestRMAppLogAggregationStatus {
     // others are SUCCEEDED, the log aggregation status for this app will
     // return TIME_OUT
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.TIME_OUT,
       rmApp.getLogAggregationStatusForAppReport());
 
@@ -371,17 +410,59 @@ public class TestRMAppLogAggregationStatus {
     // is at the final state, the log aggregation status for this app will
     // return SUCCEEDED
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
       rmApp.getLogAggregationStatusForAppReport());
 
     rmApp = (RMAppImpl)createRMApp(conf);
+    // If the log aggregation status for at least one of NMs are RUNNING,
+    // the log aggregation status for this app will return RUNNING
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING,
+      rmApp.getLogAggregationStatusForAppReport());
+
+    // If the log aggregation status for at least one of NMs
+    // are RUNNING_WITH_FAILURE, the log aggregation status
+    // for this app will return RUNNING_WITH_FAILURE
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
+      ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+      rmApp.getLogAggregationStatusForAppReport());
+
+    // For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
+    // it will not be changed even it get a new log aggregation status
+    // as RUNNING
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+      rmApp.getLogAggregationStatusForAppReport());
+
     rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
     Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
     // If at least of one log aggregation status for one NM is FAILED,
@@ -389,13 +470,13 @@ public class TestRMAppLogAggregationStatus {
     // at the final state, the log aggregation status for this app
     // will return FAILED
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());