You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/04/10 17:39:07 UTC

[1/2] hadoop git commit: YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 83979e61a -> 92431c961


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
new file mode 100644
index 0000000..7397d38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMAppLogAggregationStatus {
+
+  private RMContext rmContext;
+  private YarnScheduler scheduler;
+
+  private SchedulerEventType eventType;
+
+  private ApplicationId appId;
+
+
+  private final class TestSchedulerEventDispatcher implements
+  EventHandler<SchedulerEvent> {
+    @Override
+    public void handle(SchedulerEvent event) {
+      scheduler.handle(event);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+    rmContext =
+        new RMContextImpl(rmDispatcher, null, null, null,
+          null, null, null, null, null,
+          new RMApplicationHistoryWriter());
+    rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+
+    scheduler = mock(YarnScheduler.class);
+    doAnswer(
+        new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+            eventType = event.getType();
+            if (eventType == SchedulerEventType.NODE_UPDATE) {
+              //DO NOTHING
+            }
+            return null;
+          }
+        }
+        ).when(scheduler).handle(any(SchedulerEvent.class));
+
+    rmDispatcher.register(SchedulerEventType.class,
+        new TestSchedulerEventDispatcher());
+
+    appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testLogAggregationStatus() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, 1500);
+    RMApp rmApp = createRMApp(conf);
+    this.rmContext.getRMApps().put(appId, rmApp);
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.START));
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_NEW_SAVED));
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_ACCEPTED));
+
+    // This application will be running on two nodes
+    NodeId nodeId1 = NodeId.newInstance("localhost", 1234);
+    Resource capability = Resource.newInstance(4096, 4);
+    RMNodeImpl node1 =
+        new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
+    node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+    rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
+
+    NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
+    RMNodeImpl node2 =
+        new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
+    node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+    rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
+
+    // The initial log aggregation status for these two nodes
+    // should be NOT_STARTED
+    Map<NodeId, LogAggregationReport> logAggregationStatus =
+        rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      Assert.assertEquals(LogAggregationStatus.NOT_START, report.getValue()
+        .getLogAggregationStatus());
+    }
+
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_1 =
+        "node1 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report1 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.RUNNING, messageForNode1_1);
+    node1ReportForApp.put(appId, report1);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp));
+
+    Map<ApplicationId, LogAggregationReport> node2ReportForApp =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode2_1 =
+        "node2 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report2 =
+        LogAggregationReport.newInstance(appId, nodeId2,
+          LogAggregationStatus.RUNNING, messageForNode2_1);
+    node2ReportForApp.put(appId, report2);
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node2ReportForApp));
+    // node1 and node2 has updated its log aggregation status
+    // verify that the log aggregation status for node1, node2
+    // has been changed
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1, report.getValue()
+          .getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode2_1, report.getValue()
+          .getDiagnosticMessage());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+
+    // node1 updates its log aggregation status again
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_2 =
+        "node1 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report1_2 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.RUNNING, messageForNode1_2);
+    node1ReportForApp2.put(appId, report1_2);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp2));
+
+    // verify that the log aggregation status for node1
+    // has been changed
+    // verify that the log aggregation status for node2
+    // does not change
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
+          .getValue().getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode2_1, report.getValue()
+          .getDiagnosticMessage());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+
+    // kill the application
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.KILL));
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.ATTEMPT_KILLED));
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED));
+    Assert.assertEquals(RMAppState.KILLED, rmApp.getState());
+
+    // wait for 1500 ms
+    Thread.sleep(1500);
+
+    // the log aggregation status for both nodes should be changed
+    // to TIME_OUT
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+        .getLogAggregationStatus());
+    }
+
+    // Finally, node1 finished its log aggregation and sent out its final
+    // log aggregation status. The log aggregation status for node1 should
+    // be changed from TIME_OUT to Finished
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_3 =
+        "node1 final logAggregation status updated at "
+            + System.currentTimeMillis();
+    LogAggregationReport report1_3 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.FINISHED, messageForNode1_3);
+    node1ReportForApp3.put(appId, report1_3);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp3));
+
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1 + messageForNode1_2
+            + messageForNode1_3, report.getValue().getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+          .getLogAggregationStatus());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+  }
+
+  private RMApp createRMApp(Configuration conf) {
+    ApplicationSubmissionContext submissionContext =
+        ApplicationSubmissionContext.newInstance(appId, "test", "default",
+          Priority.newInstance(0), null, false, true,
+          2, Resource.newInstance(10, 2), "test");
+    return new RMAppImpl(this.appId, this.rmContext,
+      conf, "test", "test", "default", submissionContext,
+      this.rmContext.getScheduler(),
+      this.rmContext.getApplicationMasterService(),
+      System.currentTimeMillis(), "test",
+      null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..81de286 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -271,4 +272,9 @@ public class MockRMApp implements RMApp {
   public ResourceRequest getAMResourceRequest() {
     return this.amReq; 
   }
+
+  @Override
+  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }


[2/2] hadoop git commit: YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.

Posted by ju...@apache.org.
YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92431c96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92431c96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92431c96

Branch: refs/heads/trunk
Commit: 92431c961741747b5d6442f4025016d48d9a6863
Parents: 83979e6
Author: Junping Du <ju...@apache.org>
Authored: Fri Apr 10 08:56:18 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri Apr 10 08:56:18 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  11 +
 .../protocolrecords/LogAggregationReport.java   | 104 ++++++
 .../protocolrecords/NodeHeartbeatRequest.java   |   8 +
 .../impl/pb/LogAggregationReportPBImpl.java     | 227 +++++++++++++
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  82 ++++-
 .../api/records/LogAggregationStatus.java       |  31 ++
 .../hadoop/yarn/server/webapp/AppBlock.java     |  14 +-
 .../main/proto/yarn_server_common_protos.proto  |   8 +
 .../yarn_server_common_service_protos.proto     |  13 +
 .../hadoop/yarn/server/nodemanager/Context.java |   5 +
 .../yarn/server/nodemanager/NodeManager.java    |  12 +
 .../nodemanager/NodeStatusUpdaterImpl.java      |  72 ++++-
 .../logaggregation/AppLogAggregatorImpl.java    |  34 +-
 .../resourcemanager/ResourceTrackerService.java |  12 +-
 .../server/resourcemanager/rmapp/RMApp.java     |   3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java |  88 +++++
 .../resourcemanager/rmnode/RMNodeImpl.java      |  24 +-
 .../rmnode/RMNodeStatusEvent.java               |  26 ++
 .../webapp/AppLogAggregationStatusPage.java     |  41 +++
 .../webapp/RMAppLogAggregationStatusBlock.java  | 148 +++++++++
 .../server/resourcemanager/webapp/RMWebApp.java |   2 +
 .../resourcemanager/webapp/RmController.java    |   4 +
 .../applicationsmanager/MockAsm.java            |   6 +
 .../TestRMAppLogAggregationStatus.java          | 318 +++++++++++++++++++
 .../server/resourcemanager/rmapp/MockRMApp.java |   6 +
 26 files changed, 1291 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 72af3ba..edd1125 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -60,6 +60,9 @@ Release 2.8.0 - UNRELEASED
     container-executor for outbound network traffic control. (Sidharta Seethana
     via vinodkv)
 
+    YARN-1376. NM need to notify the log aggregation status to RM through 
+    heartbeat. (Xuan Gong via junping_du)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 13e9a10..253ae08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -742,6 +742,17 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
 
   /**
+   * How long for ResourceManager to wait for NodeManager to report its
+   * log aggregation status. If waiting time of which the log aggregation status
+   * is reported from NodeManager exceeds the configured value, RM will report
+   * log aggregation status for this NodeManager as TIME_OUT
+   */
+  public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
+      YARN_PREFIX + "log-aggregation-status.time-out.ms";
+  public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
+      = 10 * 60 * 1000;
+
+  /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if Log
    * aggregation is disabled
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
new file mode 100644
index 0000000..808804b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code LogAggregationReport} is a report for log aggregation status
+ * in one NodeManager of an application.
+ * <p>
+ * It includes details such as:
+ * <ul>
+ *   <li>{@link ApplicationId} of the application.</li>
+ *   <li>{@link NodeId} of the NodeManager.</li>
+ *   <li>{@link LogAggregationStatus}</li>
+ *   <li>Diagnostic information</li>
+ * </ul>
+ *
+ */
+@Public
+@Unstable
+public abstract class LogAggregationReport {
+
+  @Public
+  @Unstable
+  public static LogAggregationReport newInstance(ApplicationId appId,
+      NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+    LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
+    report.setApplicationId(appId);
+    report.setLogAggregationStatus(status);
+    report.setDiagnosticMessage(diagnosticMessage);
+    return report;
+  }
+
+  /**
+   * Get the <code>ApplicationId</code> of the application.
+   * @return <code>ApplicationId</code> of the application
+   */
+  @Public
+  @Unstable
+  public abstract ApplicationId getApplicationId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationId(ApplicationId appId);
+
+  /**
+   * Get the <code>NodeId</code>.
+   * @return <code>NodeId</code>
+   */
+  @Public
+  @Unstable
+  public abstract NodeId getNodeId();
+
+  @Public
+  @Unstable
+  public abstract void setNodeId(NodeId nodeId);
+
+  /**
+   * Get the <code>LogAggregationStatus</code>.
+   * @return <code>LogAggregationStatus</code>
+   */
+  @Public
+  @Unstable
+  public abstract LogAggregationStatus getLogAggregationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setLogAggregationStatus(
+      LogAggregationStatus logAggregationStatus);
+
+  /**
+   * Get  the <em>diagnositic information</em> of this log aggregation
+   * @return <em>diagnositic information</em> of this log aggregation
+   */
+  @Public
+  @Unstable
+  public abstract String getDiagnosticMessage();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticMessage(String diagnosticMessage);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index b80d9ce..227363f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -51,4 +53,10 @@ public abstract class NodeHeartbeatRequest {
   
   public abstract Set<String> getNodeLabels();
   public abstract void setNodeLabels(Set<String> nodeLabels);
+
+  public abstract Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps();
+
+  public abstract void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
new file mode 100644
index 0000000..7999fa7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogAggregationReportPBImpl extends LogAggregationReport {
+
+  LogAggregationReportProto proto = LogAggregationReportProto
+    .getDefaultInstance();
+  LogAggregationReportProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
+
+  private ApplicationId applicationId;
+  private NodeId nodeId;
+
+  public LogAggregationReportPBImpl() {
+    builder = LogAggregationReportProto.newBuilder();
+  }
+
+  public LogAggregationReportPBImpl(LogAggregationReportProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public LogAggregationReportProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.applicationId != null
+        && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+          builder.getApplicationId())) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+
+    if (this.nodeId != null
+        && !((NodeIdPBImpl) this.nodeId).getProto().equals(
+          builder.getNodeId())) {
+      builder.setNodeId(convertToProtoFormat(this.nodeId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = LogAggregationReportProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (appId == null)
+      builder.clearApplicationId();
+    this.applicationId = appId;
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(
+      ApplicationIdProto applicationId) {
+    return new ApplicationIdPBImpl(applicationId);
+  }
+
+  @Override
+  public LogAggregationStatus getLogAggregationStatus() {
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasLogAggregationStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getLogAggregationStatus());
+  }
+
+  @Override
+  public void
+      setLogAggregationStatus(LogAggregationStatus logAggregationStatus) {
+    maybeInitBuilder();
+    if (logAggregationStatus == null) {
+      builder.clearLogAggregationStatus();
+      return;
+    }
+    builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus));
+  }
+
+  private LogAggregationStatus convertFromProtoFormat(
+      LogAggregationStatusProto s) {
+    return LogAggregationStatus.valueOf(s.name().replace(
+      LOGAGGREGATION_STATUS_PREFIX, ""));
+  }
+
+  private LogAggregationStatusProto
+      convertToProtoFormat(LogAggregationStatus s) {
+    return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+        + s.name());
+  }
+
+  @Override
+  public String getDiagnosticMessage() {
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasDiagnostics()) {
+      return null;
+    }
+    return p.getDiagnostics();
+  }
+
+  @Override
+  public void setDiagnosticMessage(String diagnosticMessage) {
+    maybeInitBuilder();
+    if (diagnosticMessage == null) {
+      builder.clearDiagnostics();
+      return;
+    }
+    builder.setDiagnostics(diagnosticMessage);
+  }
+
+  @Override
+  public NodeId getNodeId() {
+    if (this.nodeId != null) {
+      return this.nodeId;
+    }
+
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeId()) {
+      return null;
+    }
+    this.nodeId = convertFromProtoFormat(p.getNodeId());
+    return this.nodeId;
+  }
+
+  @Override
+  public void setNodeId(NodeId nodeId) {
+    maybeInitBuilder();
+    if (nodeId == null)
+      builder.clearNodeId();
+    this.nodeId = nodeId;
+  }
+
+  private NodeIdProto convertToProtoFormat(NodeId t) {
+    return ((NodeIdPBImpl) t).getProto();
+  }
+
+  private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
+    return new NodeIdPBImpl(nodeId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 16d47f9..03db39c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,15 +18,24 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -42,6 +51,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
   private Set<String> labels = null;
+  private Map<ApplicationId, LogAggregationReport>
+      logAggregationReportsForApps = null;
   
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -91,6 +102,25 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.setNodeLabels(StringArrayProto.newBuilder()
           .addAllElements(this.labels).build());
     }
+    if (this.logAggregationReportsForApps != null) {
+      addLogAggregationStatusForAppsToProto();
+    }
+  }
+
+  private void addLogAggregationStatusForAppsToProto() {
+    maybeInitBuilder();
+    builder.clearLogAggregationReportsForApps();
+    for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
+      .entrySet()) {
+      builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
+        .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
+        .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+    }
+  }
+
+  private LogAggregationReportProto convertToProtoFormat(
+      LogAggregationReport value) {
+    return ((LogAggregationReportPBImpl) value).getProto();
   }
 
   private void mergeLocalToProto() {
@@ -215,4 +245,54 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     StringArrayProto nodeLabels = p.getNodeLabels();
     labels = new HashSet<String>(nodeLabels.getElementsList());
   }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  @Override
+  public Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps() {
+    if (this.logAggregationReportsForApps != null) {
+      return this.logAggregationReportsForApps;
+    }
+    initLogAggregationReportsForApps();
+    return logAggregationReportsForApps;
+  }
+
+  private void initLogAggregationReportsForApps() {
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<LogAggregationReportsForAppsProto> list =
+        p.getLogAggregationReportsForAppsList();
+    this.logAggregationReportsForApps =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    for (LogAggregationReportsForAppsProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      LogAggregationReport report =
+          convertFromProtoFormat(c.getLogAggregationReport());
+      this.logAggregationReportsForApps.put(appId, report);
+    }
+  }
+
+  private LogAggregationReport convertFromProtoFormat(
+      LogAggregationReportProto logAggregationReport) {
+    return new LogAggregationReportPBImpl(logAggregationReport);
+  }
+
+  @Override
+  public void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
+    if (logAggregationStatusForApps == null
+        || logAggregationStatusForApps.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.logAggregationReportsForApps =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+  }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
new file mode 100644
index 0000000..496767f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+/**
+ * <p>Status of Log aggregation.</p>
+ */
+public enum LogAggregationStatus {
+  DISABLED,
+  NOT_START,
+  RUNNING,
+  FINISHED,
+  FAILED,
+  TIME_OUT
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
index ae4737d..d5a3dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.webapp;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.WEB_UI_TYPE;
+
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -154,7 +157,7 @@ public class AppBlock extends HtmlBlock {
       html.script().$type("text/javascript")._(script.toString())._();
     }
 
-    info("Application Overview")
+    ResponseInfo overviewTable = info("Application Overview")
       ._("User:", app.getUser())
       ._("Name:", app.getName())
       ._("Application Type:", app.getType())
@@ -181,8 +184,13 @@ public class AppBlock extends HtmlBlock {
           .getAppState() == YarnApplicationState.FINISHED
             || app.getAppState() == YarnApplicationState.FAILED
             || app.getAppState() == YarnApplicationState.KILLED ? "History"
-            : "ApplicationMaster")
-      ._("Diagnostics:",
+            : "ApplicationMaster");
+    if (webUiType != null
+        && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
+      overviewTable._("Log Aggregation Status",
+        root_url("logaggregationstatus", app.getAppId()), "Status");
+    }
+    overviewTable._("Diagnostics:",
         app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
 
     Collection<ApplicationAttemptReport> attempts;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 01fac32..6e9f4cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -54,3 +54,11 @@ message VersionProto {
   optional int32 minor_version = 2;
 }
 
+enum LogAggregationStatusProto {
+  LOG_DISABLED = 1;
+  LOG_NOT_START = 2;
+  LOG_RUNNING = 3;
+  LOG_FINISHED = 4;
+  LOG_TIME_OUT = 5;
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d8c92c4..3103582 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -50,6 +50,19 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional StringArrayProto nodeLabels = 4;
+  repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
+}
+
+message LogAggregationReportsForAppsProto {
+  optional ApplicationIdProto appId = 1;
+  optional LogAggregationReportProto log_aggregation_report = 2;
+}
+
+message LogAggregationReportProto {
+optional ApplicationIdProto application_id = 1;
+optional NodeIdProto node_id = 2;
+optional LogAggregationStatusProto log_aggregation_status = 3;
+optional string diagnostics = 4 [default = "N/A"];
 }
 
 message NodeHeartbeatResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 6e7e2ec..42a4234 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.security.Credentials;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -77,4 +79,7 @@ public interface Context {
   boolean getDecommissioned();
 
   void setDecommissioned(boolean isDecommissioned);
+
+  ConcurrentLinkedQueue<LogAggregationReport>
+      getLogAggregationStatusForApps();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 9831fc4..0bac8d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -384,6 +386,8 @@ public class NodeManager extends CompositeService
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
     private final NMStateStoreService stateStore;
     private boolean isDecommissioned = false;
+    private final ConcurrentLinkedQueue<LogAggregationReport>
+        logAggregationReportForApps;
 
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
@@ -397,6 +401,8 @@ public class NodeManager extends CompositeService
       this.nodeHealthStatus.setHealthReport("Healthy");
       this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
       this.stateStore = stateStore;
+      this.logAggregationReportForApps = new ConcurrentLinkedQueue<
+          LogAggregationReport>();
     }
 
     /**
@@ -488,6 +494,12 @@ public class NodeManager extends CompositeService
         Map<ApplicationId, Credentials> systemCredentials) {
       this.systemCredentials = systemCredentials;
     }
+
+    @Override
+    public ConcurrentLinkedQueue<LogAggregationReport>
+        getLogAggregationStatusForApps() {
+      return this.logAggregationReportForApps;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 2549e0f..b1ab5f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.Random;
 import java.util.Set;
 
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -115,6 +118,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
 
+  private boolean logAggregationEnabled;
+
+  private final List<LogAggregationReport> logAggregationReportForAppsTempList;
+
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
@@ -144,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
     this.pendingCompletedContainers =
         new HashMap<ContainerId, ContainerStatus>();
+    this.logAggregationReportForAppsTempList =
+        new ArrayList<LogAggregationReport>();
   }
 
   @Override
@@ -193,6 +202,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     LOG.info("Initialized nodemanager for " + nodeId + ":" +
         " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
         " virtual-cores=" + virtualCores);
+
+    this.logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
   }
 
   @Override
@@ -649,6 +662,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                     NodeStatusUpdaterImpl.this.context
                         .getNMTokenSecretManager().getCurrentKey(),
                     nodeLabelsForHeartbeat);
+
+            if (logAggregationEnabled) {
+              // pull log aggregation status for application running in this NM
+              Map<ApplicationId, LogAggregationReport> logAggregationReports =
+                  getLogAggregationReportsForApps(context
+                    .getLogAggregationStatusForApps());
+              if (logAggregationReports != null
+                  && !logAggregationReports.isEmpty()) {
+                request.setLogAggregationReportsForApps(logAggregationReports);
+              }
+            }
+
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -698,6 +723,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
+            logAggregationReportForAppsTempList.clear();
             lastHeartbeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();
@@ -782,6 +808,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         new Thread(statusUpdaterRunnable, "Node Status Updater");
     statusUpdater.start();
   }
-  
-  
+
+  private Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps(
+          ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
+    Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    LogAggregationReport status;
+    while ((status = lastestLogAggregationStatus.poll()) != null) {
+      this.logAggregationReportForAppsTempList.add(status);
+    }
+    for (LogAggregationReport logAggregationReport
+        : this.logAggregationReportForAppsTempList) {
+      LogAggregationReport report = null;
+      if (latestLogAggregationReports.containsKey(logAggregationReport
+        .getApplicationId())) {
+        report =
+            latestLogAggregationReports.get(logAggregationReport
+              .getApplicationId());
+        report.setLogAggregationStatus(logAggregationReport
+          .getLogAggregationStatus());
+        String message = report.getDiagnosticMessage();
+        if (logAggregationReport.getDiagnosticMessage() != null
+            && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
+          if (message != null) {
+            message += logAggregationReport.getDiagnosticMessage();
+          } else {
+            message = logAggregationReport.getDiagnosticMessage();
+          }
+          report.setDiagnosticMessage(message);
+        }
+      } else {
+        report = Records.newRecord(LogAggregationReport.class);
+        report.setApplicationId(logAggregationReport.getApplicationId());
+        report.setNodeId(this.nodeId);
+        report.setLogAggregationStatus(logAggregationReport
+          .getLogAggregationStatus());
+        report
+          .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
+      }
+      latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
+        report);
+    }
+    return latestLogAggregationReports;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 393576b..bf7d5f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -57,12 +57,16 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.Times;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
@@ -120,6 +124,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   // This variable is only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
 
+  private boolean renameTemporaryLogFileFailed = false;
+
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
       new HashMap<ContainerId, ContainerLogAggregator>();
 
@@ -292,12 +298,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         writer.close();
       }
 
+      long currentTime = System.currentTimeMillis();
       final Path renamedPath = this.rollingMonitorInterval <= 0
               ? remoteNodeLogFileForApp : new Path(
                 remoteNodeLogFileForApp.getParent(),
                 remoteNodeLogFileForApp.getName() + "_"
-                    + System.currentTimeMillis());
+                    + currentTime);
 
+      String diagnosticMessage = "";
       final boolean rename = uploadedLogsInThisCycle;
       try {
         userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -314,12 +322,36 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             return null;
           }
         });
+        diagnosticMessage =
+            "Log uploaded successfully for Application: " + appId
+                + " in NodeManager: "
+                + LogAggregationUtils.getNodeString(nodeId) + " at "
+                + Times.format(currentTime) + "\n";
       } catch (Exception e) {
         LOG.error(
           "Failed to move temporary log file to final location: ["
               + remoteNodeTmpLogFileForApp + "] to ["
               + renamedPath + "]", e);
+        diagnosticMessage =
+            "Log uploaded failed for Application: " + appId
+                + " in NodeManager: "
+                + LogAggregationUtils.getNodeString(nodeId) + " at "
+                + Times.format(currentTime) + "\n";
+        renameTemporaryLogFileFailed = true;
+      }
+
+      LogAggregationReport report =
+          Records.newRecord(LogAggregationReport.class);
+      report.setApplicationId(appId);
+      report.setNodeId(nodeId);
+      report.setDiagnosticMessage(diagnosticMessage);
+      if (appFinished) {
+        report.setLogAggregationStatus(renameTemporaryLogFileFailed
+            ? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED);
+      } else {
+        report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
       }
+      this.context.getLogAggregationStatusForApps().add(report);
     } finally {
       if (writer != null) {
         writer.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 22efe25..5e2dc7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -458,10 +458,16 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     // 4. Send status to RMNode, saving the latest response.
-    this.rmContext.getDispatcher().getEventHandler().handle(
+    RMNodeStatusEvent nodeStatusEvent =
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-            remoteNodeStatus.getContainersStatuses(), 
-            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+          remoteNodeStatus.getContainersStatuses(),
+          remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+    if (request.getLogAggregationReportsForApps() != null
+        && !request.getLogAggregationReportsForApps().isEmpty()) {
+      nodeStatusEvent.setLogAggregationReportsForApps(request
+        .getLogAggregationReportsForApps());
+    }
+    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
 
     // 5. Update node's labels to RM's NodeLabelManager.
     if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..33eedbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -242,4 +243,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   ReservationId getReservationId();
   
   ResourceRequest getAMResourceRequest();
+
+  Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..47c4807 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -25,9 +25,11 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -142,6 +146,12 @@ public class RMAppImpl implements RMApp, Recoverable {
       new AppFinishedTransition();
   private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
 
+  private final boolean logAggregationEnabled;
+  private long logAggregationStartTime = 0;
+  private final long logAggregationStatusTimeout;
+  private final Map<NodeId, LogAggregationReport> logAggregationStatus =
+      new HashMap<NodeId, LogAggregationReport>();
+
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
   private RMAppState stateBeforeFinalSaving;
@@ -413,6 +423,19 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
     rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
+
+    long localLogAggregationStatusTimeout =
+        conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+    if (localLogAggregationStatusTimeout <= 0) {
+      this.logAggregationStatusTimeout =
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+    } else {
+      this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
+    }
+    this.logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
   }
 
   @Override
@@ -803,6 +826,12 @@ public class RMAppImpl implements RMApp, Recoverable {
       
       // otherwise, add it to ranNodes for further process
       app.ranNodes.add(nodeAddedEvent.getNodeId());
+
+      app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
+        LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
+          .getNodeId(), app.logAggregationEnabled
+            ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
+          ""));
     };
   }
 
@@ -1153,6 +1182,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
 
     public void transition(RMAppImpl app, RMAppEvent event) {
+      app.logAggregationStartTime = System.currentTimeMillis();
       for (NodeId nodeId : app.getRanNodes()) {
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1356,4 +1386,62 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     return credentials;
   }
+
+  @Override
+  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+    try {
+      this.readLock.lock();
+      Map<NodeId, LogAggregationReport> outputs =
+          new HashMap<NodeId, LogAggregationReport>();
+      outputs.putAll(logAggregationStatus);
+      for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+        if (!output.getValue().getLogAggregationStatus()
+          .equals(LogAggregationStatus.TIME_OUT)
+            && !output.getValue().getLogAggregationStatus()
+              .equals(LogAggregationStatus.FINISHED)
+            && isAppInFinalState(this)
+            && System.currentTimeMillis() > this.logAggregationStartTime
+                + this.logAggregationStatusTimeout) {
+          output.getValue().setLogAggregationStatus(
+            LogAggregationStatus.TIME_OUT);
+        }
+      }
+      return outputs;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
+    try {
+      this.writeLock.lock();
+      if (this.logAggregationEnabled) {
+        LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+        if (curReport == null) {
+          this.logAggregationStatus.put(nodeId, report);
+        } else {
+          if (curReport.getLogAggregationStatus().equals(
+            LogAggregationStatus.TIME_OUT)) {
+            if (report.getLogAggregationStatus().equals(
+              LogAggregationStatus.FINISHED)) {
+              curReport.setLogAggregationStatus(report
+                .getLogAggregationStatus());
+            }
+          } else {
+            curReport.setLogAggregationStatus(report.getLogAggregationStatus());
+          }
+
+          if (report.getDiagnosticMessage() != null
+              && !report.getDiagnosticMessage().isEmpty()) {
+            curReport
+              .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
+                  ? report.getDiagnosticMessage() : curReport
+                    .getDiagnosticMessage() + report.getDiagnosticMessage());
+          }
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index c556b80..ace2cf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -243,7 +247,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     this.stateMachine = stateMachineFactory.make(this);
     
-    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();  
+    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
   }
 
   @Override
@@ -773,6 +777,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
 
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+          statusEvent.getLogAggregationReportsForApps();
+      if (logAggregationReportsForApps != null
+          && !logAggregationReportsForApps.isEmpty()) {
+        rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
+      }
+
       if(rmNode.nextHeartBeat) {
         rmNode.nextHeartBeat = false;
         rmNode.context.getDispatcher().getEventHandler().handle(
@@ -903,4 +914,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  private void handleLogAggregationStatus(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    for (Entry<ApplicationId, LogAggregationReport> report :
+        logAggregationReportsForApps.entrySet()) {
+      RMApp rmApp = this.context.getRMApps().get(report.getKey());
+      if (rmApp != null) {
+        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+      }
+    }
+  }
+
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index abfacbb..4bbf610 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 
@@ -32,6 +34,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   private final List<ContainerStatus> containersCollection;
   private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
+  private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -41,6 +44,19 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     this.containersCollection = collection;
     this.keepAliveAppIds = keepAliveAppIds;
     this.latestResponse = latestResponse;
+    this.logAggregationReportsForApps = null;
+  }
+
+  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+      NodeHeartbeatResponse latestResponse,
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    super(nodeId, RMNodeEventType.STATUS_UPDATE);
+    this.nodeHealthStatus = nodeHealthStatus;
+    this.containersCollection = collection;
+    this.keepAliveAppIds = keepAliveAppIds;
+    this.latestResponse = latestResponse;
+    this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 
   public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +74,14 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   public List<ApplicationId> getKeepAliveAppIds() {
     return this.keepAliveAppIds;
   }
+
+  public Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps() {
+    return this.logAggregationReportsForApps;
+  }
+
+  public void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    this.logAggregationReportsForApps = logAggregationReportsForApps;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
new file mode 100644
index 0000000..ccb53dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+
+public class AppLogAggregationStatusPage extends RmView{
+
+  @Override
+  protected void preHead(Page.HTML<_> html) {
+    commonPreHead(html);
+    String appId = $(YarnWebParams.APPLICATION_ID);
+    set(
+      TITLE,
+      appId.isEmpty() ? "Bad request: missing application ID" : join(
+        "Application ", $(YarnWebParams.APPLICATION_ID)));
+  }
+
+  @Override
+  protected Class<? extends SubView> content() {
+    return RMAppLogAggregationStatusBlock.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
new file mode 100644
index 0000000..a95f76f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class RMAppLogAggregationStatusBlock extends HtmlBlock {
+
+  private static final Log LOG = LogFactory
+    .getLog(RMAppLogAggregationStatusBlock.class);
+  private final ResourceManager rm;
+  private final Configuration conf;
+
+  @Inject
+  RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm,
+      Configuration conf) {
+    super(ctx);
+    this.rm = rm;
+    this.conf = conf;
+  }
+
+  @Override
+  protected void render(Block html) {
+    String aid = $(APPLICATION_ID);
+    if (aid.isEmpty()) {
+      puts("Bad request: requires Application ID");
+      return;
+    }
+
+    ApplicationId appId;
+    try {
+      appId = Apps.toAppID(aid);
+    } catch (Exception e) {
+      puts("Invalid Application ID: " + aid);
+      return;
+    }
+
+    setTitle(join("Application ", aid));
+
+    // Add LogAggregationStatus description table
+    // to explain the meaning of different LogAggregationStatus
+    DIV<Hamlet> div_description = html.div(_INFO_WRAP);
+    TABLE<DIV<Hamlet>> table_description =
+        div_description.table("#LogAggregationStatusDecription");
+    table_description.
+      tr().
+        th(_TH, "Log Aggregation Status").
+        th(_TH, "Description").
+      _();
+    table_description.tr().td(LogAggregationStatus.DISABLED.name())
+      .td("Log Aggregation is Disabled.")._();
+    table_description.tr().td(LogAggregationStatus.NOT_START.name())
+      .td("Log Aggregation does not Start.")._();
+    table_description.tr().td(LogAggregationStatus.RUNNING.name())
+      .td("Log Aggregation is Running.")._();
+    table_description.tr().td(LogAggregationStatus.FINISHED.name())
+      .td("Log Aggregation is Finished. All of the logs have been "
+          + "aggregated successfully.")._();
+    table_description.tr().td(LogAggregationStatus.FAILED.name())
+      .td("Log Aggregation is Failed. At least one of the logs "
+          + "have not been aggregated.")._();
+    table_description.tr().td(LogAggregationStatus.TIME_OUT.name())
+      .td("Does not get the Log aggregation status for a long time. "
+          + "Not sure what is the current Log Aggregation Status.")._();
+    table_description._();
+    div_description._();
+
+    boolean logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    // Application Log aggregation status Table
+    DIV<Hamlet> div = html.div(_INFO_WRAP);
+    TABLE<DIV<Hamlet>> table =
+        div.h3(
+          "Log Aggregation: "
+              + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+          "#LogAggregationStatus");
+    table.
+      tr().
+        th(_TH, "NodeId").
+        th(_TH, "Log Aggregation Status").
+        th(_TH, "Diagnostis Message").
+      _();
+
+    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+    if (rmApp != null) {
+      Map<NodeId, LogAggregationReport> logAggregationReports =
+          rmApp.getLogAggregationReportsForApp();
+      if (logAggregationReports != null && !logAggregationReports.isEmpty()) {
+        for (Entry<NodeId, LogAggregationReport> report :
+            logAggregationReports.entrySet()) {
+          LogAggregationStatus status =
+              report.getValue() == null ? null : report.getValue()
+                .getLogAggregationStatus();
+          String message =
+              report.getValue() == null ? null : report.getValue()
+                .getDiagnosticMessage();
+          table.tr()
+            .td(report.getKey().toString())
+            .td(status == null ? "N/A" : status.toString())
+            .td(message == null ? "N/A" : message)._();
+        }
+      }
+    }
+    table._();
+    div._();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
index 86300ce..a86ed4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
@@ -68,6 +68,8 @@ public class RMWebApp extends WebApp implements YarnWebParams {
       "appattempt");
     route(pajoin("/container", CONTAINER_ID), RmController.class, "container");
     route("/errors-and-warnings", RmController.class, "errorsAndWarnings");
+    route(pajoin("/logaggregationstatus", APPLICATION_ID),
+      RmController.class, "logaggregationstatus");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
index c8e3c5b..b124d75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
@@ -109,4 +109,8 @@ public class RmController extends Controller {
   public void errorsAndWarnings() {
     render(RMErrorsAndWarningsPage.class);
   }
+
+  public void logaggregationstatus() {
+    render(AppLogAggregationStatusPage.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..a6e469e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -190,6 +191,11 @@ public abstract class MockAsm extends MockApps {
     public ResourceRequest getAMResourceRequest() {
       return this.amReq; 
     }
+
+    @Override
+    public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {