You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/04/10 17:39:07 UTC
[1/2] hadoop git commit: YARN-1376. NM need to notify the log
aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.
Repository: hadoop
Updated Branches:
refs/heads/trunk 83979e61a -> 92431c961
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
new file mode 100644
index 0000000..7397d38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMAppLogAggregationStatus {
+
+ private RMContext rmContext;
+ private YarnScheduler scheduler;
+
+ private SchedulerEventType eventType;
+
+ private ApplicationId appId;
+
+
+ private final class TestSchedulerEventDispatcher implements
+ EventHandler<SchedulerEvent> {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+ rmContext =
+ new RMContextImpl(rmDispatcher, null, null, null,
+ null, null, null, null, null,
+ new RMApplicationHistoryWriter());
+ rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+
+ scheduler = mock(YarnScheduler.class);
+ doAnswer(
+ new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+ eventType = event.getType();
+ if (eventType == SchedulerEventType.NODE_UPDATE) {
+ //DO NOTHING
+ }
+ return null;
+ }
+ }
+ ).when(scheduler).handle(any(SchedulerEvent.class));
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+ appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testLogAggregationStatus() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, 1500);
+ RMApp rmApp = createRMApp(conf);
+ this.rmContext.getRMApps().put(appId, rmApp);
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.START));
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_NEW_SAVED));
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_ACCEPTED));
+
+ // This application will be running on two nodes
+ NodeId nodeId1 = NodeId.newInstance("localhost", 1234);
+ Resource capability = Resource.newInstance(4096, 4);
+ RMNodeImpl node1 =
+ new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
+ node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+ rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
+
+ NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
+ RMNodeImpl node2 =
+ new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
+ node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+ rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
+
+ // The initial log aggregation status for these two nodes
+ // should be NOT_STARTED
+ Map<NodeId, LogAggregationReport> logAggregationStatus =
+ rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+ .entrySet()) {
+ Assert.assertEquals(LogAggregationStatus.NOT_START, report.getValue()
+ .getLogAggregationStatus());
+ }
+
+ Map<ApplicationId, LogAggregationReport> node1ReportForApp =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ String messageForNode1_1 =
+ "node1 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report1 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.RUNNING, messageForNode1_1);
+ node1ReportForApp.put(appId, report1);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+ null, node1ReportForApp));
+
+ Map<ApplicationId, LogAggregationReport> node2ReportForApp =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ String messageForNode2_1 =
+ "node2 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report2 =
+ LogAggregationReport.newInstance(appId, nodeId2,
+ LogAggregationStatus.RUNNING, messageForNode2_1);
+ node2ReportForApp.put(appId, report2);
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+ null, node2ReportForApp));
+ // node1 and node2 has updated its log aggregation status
+ // verify that the log aggregation status for node1, node2
+ // has been changed
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1, report.getValue()
+ .getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode2_1, report.getValue()
+ .getDiagnosticMessage());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+
+ // node1 updates its log aggregation status again
+ Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ String messageForNode1_2 =
+ "node1 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report1_2 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.RUNNING, messageForNode1_2);
+ node1ReportForApp2.put(appId, report1_2);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+ null, node1ReportForApp2));
+
+ // verify that the log aggregation status for node1
+ // has been changed
+ // verify that the log aggregation status for node2
+ // does not change
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
+ .getValue().getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode2_1, report.getValue()
+ .getDiagnosticMessage());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+
+ // kill the application
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.KILL));
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.ATTEMPT_KILLED));
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED));
+ Assert.assertEquals(RMAppState.KILLED, rmApp.getState());
+
+ // wait for 1500 ms
+ Thread.sleep(1500);
+
+ // the log aggregation status for both nodes should be changed
+ // to TIME_OUT
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+ .entrySet()) {
+ Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+ .getLogAggregationStatus());
+ }
+
+ // Finally, node1 finished its log aggregation and sent out its final
+ // log aggregation status. The log aggregation status for node1 should
+ // be changed from TIME_OUT to Finished
+ Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ String messageForNode1_3 =
+ "node1 final logAggregation status updated at "
+ + System.currentTimeMillis();
+ LogAggregationReport report1_3 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.FINISHED, messageForNode1_3);
+ node1ReportForApp3.put(appId, report1_3);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+ null, node1ReportForApp3));
+
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1 + messageForNode1_2
+ + messageForNode1_3, report.getValue().getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+ .getLogAggregationStatus());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+ }
+
+ private RMApp createRMApp(Configuration conf) {
+ ApplicationSubmissionContext submissionContext =
+ ApplicationSubmissionContext.newInstance(appId, "test", "default",
+ Priority.newInstance(0), null, false, true,
+ 2, Resource.newInstance(10, 2), "test");
+ return new RMAppImpl(this.appId, this.rmContext,
+ conf, "test", "test", "default", submissionContext,
+ this.rmContext.getScheduler(),
+ this.rmContext.getApplicationMasterService(),
+ System.currentTimeMillis(), "test",
+ null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..81de286 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -271,4 +272,9 @@ public class MockRMApp implements RMApp {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
[2/2] hadoop git commit: YARN-1376. NM need to notify the log
aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.
Posted by ju...@apache.org.
YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92431c96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92431c96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92431c96
Branch: refs/heads/trunk
Commit: 92431c961741747b5d6442f4025016d48d9a6863
Parents: 83979e6
Author: Junping Du <ju...@apache.org>
Authored: Fri Apr 10 08:56:18 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri Apr 10 08:56:18 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 11 +
.../protocolrecords/LogAggregationReport.java | 104 ++++++
.../protocolrecords/NodeHeartbeatRequest.java | 8 +
.../impl/pb/LogAggregationReportPBImpl.java | 227 +++++++++++++
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 82 ++++-
.../api/records/LogAggregationStatus.java | 31 ++
.../hadoop/yarn/server/webapp/AppBlock.java | 14 +-
.../main/proto/yarn_server_common_protos.proto | 8 +
.../yarn_server_common_service_protos.proto | 13 +
.../hadoop/yarn/server/nodemanager/Context.java | 5 +
.../yarn/server/nodemanager/NodeManager.java | 12 +
.../nodemanager/NodeStatusUpdaterImpl.java | 72 ++++-
.../logaggregation/AppLogAggregatorImpl.java | 34 +-
.../resourcemanager/ResourceTrackerService.java | 12 +-
.../server/resourcemanager/rmapp/RMApp.java | 3 +
.../server/resourcemanager/rmapp/RMAppImpl.java | 88 +++++
.../resourcemanager/rmnode/RMNodeImpl.java | 24 +-
.../rmnode/RMNodeStatusEvent.java | 26 ++
.../webapp/AppLogAggregationStatusPage.java | 41 +++
.../webapp/RMAppLogAggregationStatusBlock.java | 148 +++++++++
.../server/resourcemanager/webapp/RMWebApp.java | 2 +
.../resourcemanager/webapp/RmController.java | 4 +
.../applicationsmanager/MockAsm.java | 6 +
.../TestRMAppLogAggregationStatus.java | 318 +++++++++++++++++++
.../server/resourcemanager/rmapp/MockRMApp.java | 6 +
26 files changed, 1291 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 72af3ba..edd1125 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -60,6 +60,9 @@ Release 2.8.0 - UNRELEASED
container-executor for outbound network traffic control. (Sidharta Seethana
via vinodkv)
+ YARN-1376. NM need to notify the log aggregation status to RM through
+ heartbeat. (Xuan Gong via junping_du)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 13e9a10..253ae08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -742,6 +742,17 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
/**
+ * How long for ResourceManager to wait for NodeManager to report its
+ * log aggregation status. If waiting time of which the log aggregation status
+ * is reported from NodeManager exceeds the configured value, RM will report
+ * log aggregation status for this NodeManager as TIME_OUT
+ */
+ public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
+ YARN_PREFIX + "log-aggregation-status.time-out.ms";
+ public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
+ = 10 * 60 * 1000;
+
+ /**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
new file mode 100644
index 0000000..808804b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code LogAggregationReport} is a report for log aggregation status
+ * in one NodeManager of an application.
+ * <p>
+ * It includes details such as:
+ * <ul>
+ * <li>{@link ApplicationId} of the application.</li>
+ * <li>{@link NodeId} of the NodeManager.</li>
+ * <li>{@link LogAggregationStatus}</li>
+ * <li>Diagnostic information</li>
+ * </ul>
+ *
+ */
+@Public
+@Unstable
+public abstract class LogAggregationReport {
+
+ @Public
+ @Unstable
+ public static LogAggregationReport newInstance(ApplicationId appId,
+ NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+ LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(appId);
+ report.setLogAggregationStatus(status);
+ report.setDiagnosticMessage(diagnosticMessage);
+ return report;
+ }
+
+ /**
+ * Get the <code>ApplicationId</code> of the application.
+ * @return <code>ApplicationId</code> of the application
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationId(ApplicationId appId);
+
+ /**
+ * Get the <code>NodeId</code>.
+ * @return <code>NodeId</code>
+ */
+ @Public
+ @Unstable
+ public abstract NodeId getNodeId();
+
+ @Public
+ @Unstable
+ public abstract void setNodeId(NodeId nodeId);
+
+ /**
+ * Get the <code>LogAggregationStatus</code>.
+ * @return <code>LogAggregationStatus</code>
+ */
+ @Public
+ @Unstable
+ public abstract LogAggregationStatus getLogAggregationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setLogAggregationStatus(
+ LogAggregationStatus logAggregationStatus);
+
+ /**
+ * Get the <em>diagnositic information</em> of this log aggregation
+ * @return <em>diagnositic information</em> of this log aggregation
+ */
+ @Public
+ @Unstable
+ public abstract String getDiagnosticMessage();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticMessage(String diagnosticMessage);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index b80d9ce..227363f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@@ -51,4 +53,10 @@ public abstract class NodeHeartbeatRequest {
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
+
+ public abstract Map<ApplicationId, LogAggregationReport>
+ getLogAggregationReportsForApps();
+
+ public abstract void setLogAggregationReportsForApps(
+ Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
new file mode 100644
index 0000000..7999fa7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogAggregationReportPBImpl extends LogAggregationReport {
+
+ LogAggregationReportProto proto = LogAggregationReportProto
+ .getDefaultInstance();
+ LogAggregationReportProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
+
+ private ApplicationId applicationId;
+ private NodeId nodeId;
+
+ public LogAggregationReportPBImpl() {
+ builder = LogAggregationReportProto.newBuilder();
+ }
+
+ public LogAggregationReportPBImpl(LogAggregationReportProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogAggregationReportProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.applicationId != null
+ && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+ builder.getApplicationId())) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+
+ if (this.nodeId != null
+ && !((NodeIdPBImpl) this.nodeId).getProto().equals(
+ builder.getNodeId())) {
+ builder.setNodeId(convertToProtoFormat(this.nodeId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogAggregationReportProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null)
+ builder.clearApplicationId();
+ this.applicationId = appId;
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(
+ ApplicationIdProto applicationId) {
+ return new ApplicationIdPBImpl(applicationId);
+ }
+
+ @Override
+ public LogAggregationStatus getLogAggregationStatus() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasLogAggregationStatus()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationStatus());
+ }
+
+ @Override
+ public void
+ setLogAggregationStatus(LogAggregationStatus logAggregationStatus) {
+ maybeInitBuilder();
+ if (logAggregationStatus == null) {
+ builder.clearLogAggregationStatus();
+ return;
+ }
+ builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus));
+ }
+
+ private LogAggregationStatus convertFromProtoFormat(
+ LogAggregationStatusProto s) {
+ return LogAggregationStatus.valueOf(s.name().replace(
+ LOGAGGREGATION_STATUS_PREFIX, ""));
+ }
+
+ private LogAggregationStatusProto
+ convertToProtoFormat(LogAggregationStatus s) {
+ return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+ + s.name());
+ }
+
+ @Override
+ public String getDiagnosticMessage() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnosticMessage(String diagnosticMessage) {
+ maybeInitBuilder();
+ if (diagnosticMessage == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnosticMessage);
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ if (this.nodeId != null) {
+ return this.nodeId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeId()) {
+ return null;
+ }
+ this.nodeId = convertFromProtoFormat(p.getNodeId());
+ return this.nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ maybeInitBuilder();
+ if (nodeId == null)
+ builder.clearNodeId();
+ this.nodeId = nodeId;
+ }
+
+ private NodeIdProto convertToProtoFormat(NodeId t) {
+ return ((NodeIdPBImpl) t).getProto();
+ }
+
+ private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
+ return new NodeIdPBImpl(nodeId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 16d47f9..03db39c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,15 +18,24 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -42,6 +51,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null;
+ private Map<ApplicationId, LogAggregationReport>
+ logAggregationReportsForApps = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -91,6 +102,25 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
+ if (this.logAggregationReportsForApps != null) {
+ addLogAggregationStatusForAppsToProto();
+ }
+ }
+
+ private void addLogAggregationStatusForAppsToProto() {
+ maybeInitBuilder();
+ builder.clearLogAggregationReportsForApps();
+ for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
+ .entrySet()) {
+ builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
+ .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
+ .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+ }
+ }
+
+ private LogAggregationReportProto convertToProtoFormat(
+ LogAggregationReport value) {
+ return ((LogAggregationReportPBImpl) value).getProto();
}
private void mergeLocalToProto() {
@@ -215,4 +245,54 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ @Override
+ public Map<ApplicationId, LogAggregationReport>
+ getLogAggregationReportsForApps() {
+ if (this.logAggregationReportsForApps != null) {
+ return this.logAggregationReportsForApps;
+ }
+ initLogAggregationReportsForApps();
+ return logAggregationReportsForApps;
+ }
+
+ private void initLogAggregationReportsForApps() {
+ NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<LogAggregationReportsForAppsProto> list =
+ p.getLogAggregationReportsForAppsList();
+ this.logAggregationReportsForApps =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ for (LogAggregationReportsForAppsProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ LogAggregationReport report =
+ convertFromProtoFormat(c.getLogAggregationReport());
+ this.logAggregationReportsForApps.put(appId, report);
+ }
+ }
+
+ private LogAggregationReport convertFromProtoFormat(
+ LogAggregationReportProto logAggregationReport) {
+ return new LogAggregationReportPBImpl(logAggregationReport);
+ }
+
+ @Override
+ public void setLogAggregationReportsForApps(
+ Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
+ if (logAggregationStatusForApps == null
+ || logAggregationStatusForApps.isEmpty()) {
+ return;
+ }
+ maybeInitBuilder();
+ this.logAggregationReportsForApps =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
new file mode 100644
index 0000000..496767f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+/**
+ * <p>Status of Log aggregation.</p>
+ */
+public enum LogAggregationStatus {
+ DISABLED,
+ NOT_START,
+ RUNNING,
+ FINISHED,
+ FAILED,
+ TIME_OUT
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
index ae4737d..d5a3dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.WEB_UI_TYPE;
+
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
+
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -154,7 +157,7 @@ public class AppBlock extends HtmlBlock {
html.script().$type("text/javascript")._(script.toString())._();
}
- info("Application Overview")
+ ResponseInfo overviewTable = info("Application Overview")
._("User:", app.getUser())
._("Name:", app.getName())
._("Application Type:", app.getType())
@@ -181,8 +184,13 @@ public class AppBlock extends HtmlBlock {
.getAppState() == YarnApplicationState.FINISHED
|| app.getAppState() == YarnApplicationState.FAILED
|| app.getAppState() == YarnApplicationState.KILLED ? "History"
- : "ApplicationMaster")
- ._("Diagnostics:",
+ : "ApplicationMaster");
+ if (webUiType != null
+ && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
+ overviewTable._("Log Aggregation Status",
+ root_url("logaggregationstatus", app.getAppId()), "Status");
+ }
+ overviewTable._("Diagnostics:",
app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
Collection<ApplicationAttemptReport> attempts;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 01fac32..6e9f4cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -54,3 +54,11 @@ message VersionProto {
optional int32 minor_version = 2;
}
+enum LogAggregationStatusProto {
+ LOG_DISABLED = 1;
+ LOG_NOT_START = 2;
+ LOG_RUNNING = 3;
+ LOG_FINISHED = 4;
+ LOG_TIME_OUT = 5;
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d8c92c4..3103582 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -50,6 +50,19 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
+ repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
+}
+
+message LogAggregationReportsForAppsProto {
+ optional ApplicationIdProto appId = 1;
+ optional LogAggregationReportProto log_aggregation_report = 2;
+}
+
+message LogAggregationReportProto {
+optional ApplicationIdProto application_id = 1;
+optional NodeIdProto node_id = 2;
+optional LogAggregationStatusProto log_aggregation_status = 3;
+optional string diagnostics = 4 [default = "N/A"];
}
message NodeHeartbeatResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 6e7e2ec..42a4234 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.security.Credentials;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -77,4 +79,7 @@ public interface Context {
boolean getDecommissioned();
void setDecommissioned(boolean isDecommissioned);
+
+ ConcurrentLinkedQueue<LogAggregationReport>
+ getLogAggregationStatusForApps();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 9831fc4..0bac8d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -384,6 +386,8 @@ public class NodeManager extends CompositeService
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
private final NMStateStoreService stateStore;
private boolean isDecommissioned = false;
+ private final ConcurrentLinkedQueue<LogAggregationReport>
+ logAggregationReportForApps;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
@@ -397,6 +401,8 @@ public class NodeManager extends CompositeService
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
this.stateStore = stateStore;
+ this.logAggregationReportForApps = new ConcurrentLinkedQueue<
+ LogAggregationReport>();
}
/**
@@ -488,6 +494,12 @@ public class NodeManager extends CompositeService
Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials;
}
+
+ @Override
+ public ConcurrentLinkedQueue<LogAggregationReport>
+ getLogAggregationStatusForApps() {
+ return this.logAggregationReportForApps;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 2549e0f..b1ab5f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Random;
import java.util.Set;
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@@ -115,6 +118,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
+ private boolean logAggregationEnabled;
+
+ private final List<LogAggregationReport> logAggregationReportForAppsTempList;
+
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@@ -144,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
new HashMap<ContainerId, ContainerStatus>();
+ this.logAggregationReportForAppsTempList =
+ new ArrayList<LogAggregationReport>();
}
@Override
@@ -193,6 +202,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
+
+ this.logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
}
@Override
@@ -649,6 +662,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
+
+ if (logAggregationEnabled) {
+ // pull log aggregation status for application running in this NM
+ Map<ApplicationId, LogAggregationReport> logAggregationReports =
+ getLogAggregationReportsForApps(context
+ .getLogAggregationStatusForApps());
+ if (logAggregationReports != null
+ && !logAggregationReports.isEmpty()) {
+ request.setLogAggregationReportsForApps(logAggregationReports);
+ }
+ }
+
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -698,6 +723,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
+ logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
@@ -782,6 +808,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
-
-
+
+ private Map<ApplicationId, LogAggregationReport>
+ getLogAggregationReportsForApps(
+ ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
+ Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
+ new HashMap<ApplicationId, LogAggregationReport>();
+ LogAggregationReport status;
+ while ((status = lastestLogAggregationStatus.poll()) != null) {
+ this.logAggregationReportForAppsTempList.add(status);
+ }
+ for (LogAggregationReport logAggregationReport
+ : this.logAggregationReportForAppsTempList) {
+ LogAggregationReport report = null;
+ if (latestLogAggregationReports.containsKey(logAggregationReport
+ .getApplicationId())) {
+ report =
+ latestLogAggregationReports.get(logAggregationReport
+ .getApplicationId());
+ report.setLogAggregationStatus(logAggregationReport
+ .getLogAggregationStatus());
+ String message = report.getDiagnosticMessage();
+ if (logAggregationReport.getDiagnosticMessage() != null
+ && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
+ if (message != null) {
+ message += logAggregationReport.getDiagnosticMessage();
+ } else {
+ message = logAggregationReport.getDiagnosticMessage();
+ }
+ report.setDiagnosticMessage(message);
+ }
+ } else {
+ report = Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(logAggregationReport.getApplicationId());
+ report.setNodeId(this.nodeId);
+ report.setLogAggregationStatus(logAggregationReport
+ .getLogAggregationStatus());
+ report
+ .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
+ }
+ latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
+ report);
+ }
+ return latestLogAggregationReports;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 393576b..bf7d5f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -57,12 +57,16 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@@ -120,6 +124,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
+ private boolean renameTemporaryLogFileFailed = false;
+
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>();
@@ -292,12 +298,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
writer.close();
}
+ long currentTime = System.currentTimeMillis();
final Path renamedPath = this.rollingMonitorInterval <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
- + System.currentTimeMillis());
+ + currentTime);
+ String diagnosticMessage = "";
final boolean rename = uploadedLogsInThisCycle;
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -314,12 +322,36 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return null;
}
});
+ diagnosticMessage =
+ "Log uploaded successfully for Application: " + appId
+ + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(nodeId) + " at "
+ + Times.format(currentTime) + "\n";
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to ["
+ renamedPath + "]", e);
+ diagnosticMessage =
+ "Log uploaded failed for Application: " + appId
+ + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(nodeId) + " at "
+ + Times.format(currentTime) + "\n";
+ renameTemporaryLogFileFailed = true;
+ }
+
+ LogAggregationReport report =
+ Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(appId);
+ report.setNodeId(nodeId);
+ report.setDiagnosticMessage(diagnosticMessage);
+ if (appFinished) {
+ report.setLogAggregationStatus(renameTemporaryLogFileFailed
+ ? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED);
+ } else {
+ report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
}
+ this.context.getLogAggregationStatusForApps().add(report);
} finally {
if (writer != null) {
writer.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 22efe25..5e2dc7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -458,10 +458,16 @@ public class ResourceTrackerService extends AbstractService implements
}
// 4. Send status to RMNode, saving the latest response.
- this.rmContext.getDispatcher().getEventHandler().handle(
+ RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
- remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ remoteNodeStatus.getContainersStatuses(),
+ remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+ if (request.getLogAggregationReportsForApps() != null
+ && !request.getLogAggregationReportsForApps().isEmpty()) {
+ nodeStatusEvent.setLogAggregationReportsForApps(request
+ .getLogAggregationReportsForApps());
+ }
+ this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
// 5. Update node's labels to RM's NodeLabelManager.
if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..33eedbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -242,4 +243,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
ReservationId getReservationId();
ResourceRequest getAMResourceRequest();
+
+ Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..47c4807 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -25,9 +25,11 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -142,6 +146,12 @@ public class RMAppImpl implements RMApp, Recoverable {
new AppFinishedTransition();
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
+ private final boolean logAggregationEnabled;
+ private long logAggregationStartTime = 0;
+ private final long logAggregationStatusTimeout;
+ private final Map<NodeId, LogAggregationReport> logAggregationStatus =
+ new HashMap<NodeId, LogAggregationReport>();
+
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
@@ -413,6 +423,19 @@ public class RMAppImpl implements RMApp, Recoverable {
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
+
+ long localLogAggregationStatusTimeout =
+ conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+ if (localLogAggregationStatusTimeout <= 0) {
+ this.logAggregationStatusTimeout =
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+ } else {
+ this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
+ }
+ this.logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
}
@Override
@@ -803,6 +826,12 @@ public class RMAppImpl implements RMApp, Recoverable {
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
+
+ app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
+ LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
+ .getNodeId(), app.logAggregationEnabled
+ ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
+ ""));
};
}
@@ -1153,6 +1182,7 @@ public class RMAppImpl implements RMApp, Recoverable {
}
public void transition(RMAppImpl app, RMAppEvent event) {
+ app.logAggregationStartTime = System.currentTimeMillis();
for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1356,4 +1386,62 @@ public class RMAppImpl implements RMApp, Recoverable {
}
return credentials;
}
+
+ @Override
+ public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+ try {
+ this.readLock.lock();
+ Map<NodeId, LogAggregationReport> outputs =
+ new HashMap<NodeId, LogAggregationReport>();
+ outputs.putAll(logAggregationStatus);
+ for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+ if (!output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.TIME_OUT)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.FINISHED)
+ && isAppInFinalState(this)
+ && System.currentTimeMillis() > this.logAggregationStartTime
+ + this.logAggregationStatusTimeout) {
+ output.getValue().setLogAggregationStatus(
+ LogAggregationStatus.TIME_OUT);
+ }
+ }
+ return outputs;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
+ try {
+ this.writeLock.lock();
+ if (this.logAggregationEnabled) {
+ LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+ if (curReport == null) {
+ this.logAggregationStatus.put(nodeId, report);
+ } else {
+ if (curReport.getLogAggregationStatus().equals(
+ LogAggregationStatus.TIME_OUT)) {
+ if (report.getLogAggregationStatus().equals(
+ LogAggregationStatus.FINISHED)) {
+ curReport.setLogAggregationStatus(report
+ .getLogAggregationStatus());
+ }
+ } else {
+ curReport.setLogAggregationStatus(report.getLogAggregationStatus());
+ }
+
+ if (report.getDiagnosticMessage() != null
+ && !report.getDiagnosticMessage().isEmpty()) {
+ curReport
+ .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
+ ? report.getDiagnosticMessage() : curReport
+ .getDiagnosticMessage() + report.getDiagnosticMessage());
+ }
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index c556b80..ace2cf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -243,7 +247,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.stateMachine = stateMachineFactory.make(this);
- this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
+ this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
}
@Override
@@ -773,6 +777,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.handleContainerStatus(statusEvent.getContainers());
+ Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+ statusEvent.getLogAggregationReportsForApps();
+ if (logAggregationReportsForApps != null
+ && !logAggregationReportsForApps.isEmpty()) {
+ rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
+ }
+
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
@@ -903,4 +914,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
+ private void handleLogAggregationStatus(
+ Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+ for (Entry<ApplicationId, LogAggregationReport> report :
+ logAggregationReportsForApps.entrySet()) {
+ RMApp rmApp = this.context.getRMApps().get(report.getKey());
+ if (rmApp != null) {
+ ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index abfacbb..4bbf610 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -32,6 +34,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
private final List<ContainerStatus> containersCollection;
private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
+ private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -41,6 +44,19 @@ public class RMNodeStatusEvent extends RMNodeEvent {
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
+ this.logAggregationReportsForApps = null;
+ }
+
+ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+ List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+ NodeHeartbeatResponse latestResponse,
+ Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+ super(nodeId, RMNodeEventType.STATUS_UPDATE);
+ this.nodeHealthStatus = nodeHealthStatus;
+ this.containersCollection = collection;
+ this.keepAliveAppIds = keepAliveAppIds;
+ this.latestResponse = latestResponse;
+ this.logAggregationReportsForApps = logAggregationReportsForApps;
}
public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +74,14 @@ public class RMNodeStatusEvent extends RMNodeEvent {
public List<ApplicationId> getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
+
+ public Map<ApplicationId, LogAggregationReport>
+ getLogAggregationReportsForApps() {
+ return this.logAggregationReportsForApps;
+ }
+
+ public void setLogAggregationReportsForApps(
+ Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+ this.logAggregationReportsForApps = logAggregationReportsForApps;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
new file mode 100644
index 0000000..ccb53dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+
+public class AppLogAggregationStatusPage extends RmView{
+
+ @Override
+ protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ String appId = $(YarnWebParams.APPLICATION_ID);
+ set(
+ TITLE,
+ appId.isEmpty() ? "Bad request: missing application ID" : join(
+ "Application ", $(YarnWebParams.APPLICATION_ID)));
+ }
+
+ @Override
+ protected Class<? extends SubView> content() {
+ return RMAppLogAggregationStatusBlock.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
new file mode 100644
index 0000000..a95f76f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class RMAppLogAggregationStatusBlock extends HtmlBlock {
+
+ private static final Log LOG = LogFactory
+ .getLog(RMAppLogAggregationStatusBlock.class);
+ private final ResourceManager rm;
+ private final Configuration conf;
+
+ @Inject
+ RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm,
+ Configuration conf) {
+ super(ctx);
+ this.rm = rm;
+ this.conf = conf;
+ }
+
+ @Override
+ protected void render(Block html) {
+ String aid = $(APPLICATION_ID);
+ if (aid.isEmpty()) {
+ puts("Bad request: requires Application ID");
+ return;
+ }
+
+ ApplicationId appId;
+ try {
+ appId = Apps.toAppID(aid);
+ } catch (Exception e) {
+ puts("Invalid Application ID: " + aid);
+ return;
+ }
+
+ setTitle(join("Application ", aid));
+
+ // Add LogAggregationStatus description table
+ // to explain the meaning of different LogAggregationStatus
+ DIV<Hamlet> div_description = html.div(_INFO_WRAP);
+ TABLE<DIV<Hamlet>> table_description =
+ div_description.table("#LogAggregationStatusDecription");
+ table_description.
+ tr().
+ th(_TH, "Log Aggregation Status").
+ th(_TH, "Description").
+ _();
+ table_description.tr().td(LogAggregationStatus.DISABLED.name())
+ .td("Log Aggregation is Disabled.")._();
+ table_description.tr().td(LogAggregationStatus.NOT_START.name())
+ .td("Log Aggregation does not Start.")._();
+ table_description.tr().td(LogAggregationStatus.RUNNING.name())
+ .td("Log Aggregation is Running.")._();
+ table_description.tr().td(LogAggregationStatus.FINISHED.name())
+ .td("Log Aggregation is Finished. All of the logs have been "
+ + "aggregated successfully.")._();
+ table_description.tr().td(LogAggregationStatus.FAILED.name())
+ .td("Log Aggregation is Failed. At least one of the logs "
+ + "have not been aggregated.")._();
+ table_description.tr().td(LogAggregationStatus.TIME_OUT.name())
+ .td("Does not get the Log aggregation status for a long time. "
+ + "Not sure what is the current Log Aggregation Status.")._();
+ table_description._();
+ div_description._();
+
+ boolean logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ // Application Log aggregation status Table
+ DIV<Hamlet> div = html.div(_INFO_WRAP);
+ TABLE<DIV<Hamlet>> table =
+ div.h3(
+ "Log Aggregation: "
+ + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+ "#LogAggregationStatus");
+ table.
+ tr().
+ th(_TH, "NodeId").
+ th(_TH, "Log Aggregation Status").
+ th(_TH, "Diagnostis Message").
+ _();
+
+ RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+ if (rmApp != null) {
+ Map<NodeId, LogAggregationReport> logAggregationReports =
+ rmApp.getLogAggregationReportsForApp();
+ if (logAggregationReports != null && !logAggregationReports.isEmpty()) {
+ for (Entry<NodeId, LogAggregationReport> report :
+ logAggregationReports.entrySet()) {
+ LogAggregationStatus status =
+ report.getValue() == null ? null : report.getValue()
+ .getLogAggregationStatus();
+ String message =
+ report.getValue() == null ? null : report.getValue()
+ .getDiagnosticMessage();
+ table.tr()
+ .td(report.getKey().toString())
+ .td(status == null ? "N/A" : status.toString())
+ .td(message == null ? "N/A" : message)._();
+ }
+ }
+ }
+ table._();
+ div._();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
index 86300ce..a86ed4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
@@ -68,6 +68,8 @@ public class RMWebApp extends WebApp implements YarnWebParams {
"appattempt");
route(pajoin("/container", CONTAINER_ID), RmController.class, "container");
route("/errors-and-warnings", RmController.class, "errorsAndWarnings");
+ route(pajoin("/logaggregationstatus", APPLICATION_ID),
+ RmController.class, "logaggregationstatus");
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
index c8e3c5b..b124d75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
@@ -109,4 +109,8 @@ public class RmController extends Controller {
public void errorsAndWarnings() {
render(RMErrorsAndWarningsPage.class);
}
+
+ public void logaggregationstatus() {
+ render(AppLogAggregationStatusPage.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92431c96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..a6e469e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -190,6 +191,11 @@ public abstract class MockAsm extends MockApps {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {