You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/05/14 19:39:55 UTC
hadoop git commit: YARN-3505. Node's Log Aggregation Report with
SUCCEED should not cached in RMApps. Contributed by Xuan Gong. (cherry picked
from commit 15ccd967ee3e7046a50522089f67ba01f36ec76a)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 454236ec1 -> bc13c7d84
YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong.
(cherry picked from commit 15ccd967ee3e7046a50522089f67ba01f36ec76a)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc13c7d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc13c7d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc13c7d8
Branch: refs/heads/branch-2
Commit: bc13c7d84bf4693c976ecdff7d69686705bf906c
Parents: 454236e
Author: Junping Du <ju...@apache.org>
Authored: Thu May 14 10:57:36 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu May 14 10:59:48 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/api/records/LogAggregationStatus.java | 2 +
.../hadoop/yarn/conf/YarnConfiguration.java | 10 +
.../src/main/proto/yarn_protos.proto | 1 +
.../src/main/resources/yarn-default.xml | 8 +
.../protocolrecords/LogAggregationReport.java | 16 +-
.../protocolrecords/NodeHeartbeatRequest.java | 7 +-
.../impl/pb/LogAggregationReportPBImpl.java | 40 ----
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 82 +++----
.../hadoop/yarn/server/webapp/AppBlock.java | 19 +-
.../yarn_server_common_service_protos.proto | 14 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 46 +---
.../logaggregation/AppLogAggregatorImpl.java | 19 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 228 +++++++++++++++----
.../resourcemanager/rmnode/RMNodeImpl.java | 13 +-
.../rmnode/RMNodeStatusEvent.java | 11 +-
.../resourcemanager/webapp/RMAppBlock.java | 11 +-
.../webapp/RMAppLogAggregationStatusBlock.java | 37 ++-
.../TestRMAppLogAggregationStatus.java | 181 +++++++++++----
19 files changed, 469 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 58e51c3..c642b12 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -63,6 +63,9 @@ Release 2.8.0 - UNRELEASED
YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
(Jonathan Eagles via zjshen)
+ YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in
+ RMApps. (Xuan Gong via junping_du)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
index da1230c..1e10972 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
@@ -34,6 +34,8 @@ public enum LogAggregationStatus {
/** Log Aggregation is Running. */
RUNNING,
+ /** Log Aggregation is Running, but has failures in previous cycles. */
+ RUNNING_WITH_FAILURE,
/**
* Log Aggregation is Succeeded. All of the logs have been aggregated
* successfully.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d677c59..7a05d13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -718,6 +718,16 @@ public class YarnConfiguration extends Configuration {
+ "proxy-user-privileges.enabled";
public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+ /**
+ * How many diagnostics/failure messages can be saved in RM for
+ * log aggregation. It also defines the number of diagnostics/failure
+ * messages can be shown in log aggregation web ui.
+ */
+ public static final String RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+ RM_PREFIX + "max-log-aggregation-diagnostics-in-memory";
+ public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+ 10;
+
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index c45081a..4095676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -204,6 +204,7 @@ enum LogAggregationStatusProto {
LOG_SUCCEEDED = 4;
LOG_FAILED = 5;
LOG_TIME_OUT = 6;
+ LOG_RUNNING_WITH_FAILURE = 7;
}
message ApplicationAttemptReportProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4d74f76..1dd88bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -674,6 +674,14 @@
<value>10</value>
</property>
+ <property>
+ <description>Number of diagnostics/failure messages can be saved in RM for
+ log aggregation. It also defines the number of diagnostics/failure
+ messages can be shown in log aggregation web ui.</description>
+ <name>yarn.resourcemanager.max-log-aggregation-diagnostics-in-memory</name>
+ <value>10</value>
+ </property>
+
<!-- Node Manager Configs -->
<property>
<description>The hostname of the NM.</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
index b2270d8..d76f4cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.util.Records;
* It includes details such as:
* <ul>
* <li>{@link ApplicationId} of the application.</li>
- * <li>{@link NodeId} of the NodeManager.</li>
* <li>{@link LogAggregationStatus}</li>
* <li>Diagnostic information</li>
* </ul>
@@ -45,7 +43,7 @@ public abstract class LogAggregationReport {
@Public
@Unstable
public static LogAggregationReport newInstance(ApplicationId appId,
- NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+ LogAggregationStatus status, String diagnosticMessage) {
LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setLogAggregationStatus(status);
@@ -66,18 +64,6 @@ public abstract class LogAggregationReport {
public abstract void setApplicationId(ApplicationId appId);
/**
- * Get the <code>NodeId</code>.
- * @return <code>NodeId</code>
- */
- @Public
- @Unstable
- public abstract NodeId getNodeId();
-
- @Public
- @Unstable
- public abstract void setNodeId(NodeId nodeId);
-
- /**
* Get the <code>LogAggregationStatus</code>.
* @return <code>LogAggregationStatus</code>
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index 227363f..767e4b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,10 +18,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@@ -54,9 +53,9 @@ public abstract class NodeHeartbeatRequest {
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
- public abstract Map<ApplicationId, LogAggregationReport>
+ public abstract List<LogAggregationReport>
getLogAggregationReportsForApps();
public abstract void setLogAggregationReportsForApps(
- Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
+ List<LogAggregationReport> logAggregationReportsForApps);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
index 75b6eab..ac6ad2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
@@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -45,7 +42,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
boolean viaProto = false;
private ApplicationId applicationId;
- private NodeId nodeId;
public LogAggregationReportPBImpl() {
builder = LogAggregationReportProto.newBuilder();
@@ -89,12 +85,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
builder.getApplicationId())) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
-
- if (this.nodeId != null
- && !((NodeIdPBImpl) this.nodeId).getProto().equals(
- builder.getNodeId())) {
- builder.setNodeId(convertToProtoFormat(this.nodeId));
- }
}
private void mergeLocalToProto() {
@@ -191,34 +181,4 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
}
builder.setDiagnostics(diagnosticMessage);
}
-
- @Override
- public NodeId getNodeId() {
- if (this.nodeId != null) {
- return this.nodeId;
- }
-
- LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasNodeId()) {
- return null;
- }
- this.nodeId = convertFromProtoFormat(p.getNodeId());
- return this.nodeId;
- }
-
- @Override
- public void setNodeId(NodeId nodeId) {
- maybeInitBuilder();
- if (nodeId == null)
- builder.clearNodeId();
- this.nodeId = nodeId;
- }
-
- private NodeIdProto convertToProtoFormat(NodeId t) {
- return ((NodeIdPBImpl) t).getProto();
- }
-
- private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
- return new NodeIdPBImpl(nodeId);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 03db39c..81f173d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,21 +18,16 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -51,9 +46,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null;
- private Map<ApplicationId, LogAggregationReport>
- logAggregationReportsForApps = null;
-
+ private List<LogAggregationReport> logAggregationReportsForApps = null;
+
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
}
@@ -110,12 +104,35 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void addLogAggregationStatusForAppsToProto() {
maybeInitBuilder();
builder.clearLogAggregationReportsForApps();
- for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
- .entrySet()) {
- builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
- .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
- .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+ if (this.logAggregationReportsForApps == null) {
+ return;
}
+ Iterable<LogAggregationReportProto> it =
+ new Iterable<LogAggregationReportProto>() {
+ @Override
+ public Iterator<LogAggregationReportProto> iterator() {
+ return new Iterator<LogAggregationReportProto>() {
+ private Iterator<LogAggregationReport> iter =
+ logAggregationReportsForApps.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public LogAggregationReportProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllLogAggregationReportsForApps(it);
}
private LogAggregationReportProto convertToProtoFormat(
@@ -246,17 +263,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
labels = new HashSet<String>(nodeLabels.getElementsList());
}
- private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
- return new ApplicationIdPBImpl(p);
- }
-
- private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
- return ((ApplicationIdPBImpl) t).getProto();
- }
-
@Override
- public Map<ApplicationId, LogAggregationReport>
- getLogAggregationReportsForApps() {
+ public List<LogAggregationReport> getLogAggregationReportsForApps() {
if (this.logAggregationReportsForApps != null) {
return this.logAggregationReportsForApps;
}
@@ -266,15 +274,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void initLogAggregationReportsForApps() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
- List<LogAggregationReportsForAppsProto> list =
+ List<LogAggregationReportProto> list =
p.getLogAggregationReportsForAppsList();
- this.logAggregationReportsForApps =
- new HashMap<ApplicationId, LogAggregationReport>();
- for (LogAggregationReportsForAppsProto c : list) {
- ApplicationId appId = convertFromProtoFormat(c.getAppId());
- LogAggregationReport report =
- convertFromProtoFormat(c.getLogAggregationReport());
- this.logAggregationReportsForApps.put(appId, report);
+ this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
+ for (LogAggregationReportProto c : list) {
+ this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
}
}
@@ -285,14 +289,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
@Override
public void setLogAggregationReportsForApps(
- Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
- if (logAggregationStatusForApps == null
- || logAggregationStatusForApps.isEmpty()) {
- return;
+ List<LogAggregationReport> logAggregationStatusForApps) {
+ if(logAggregationStatusForApps == null) {
+ builder.clearLogAggregationReportsForApps();
}
- maybeInitBuilder();
- this.logAggregationReportsForApps =
- new HashMap<ApplicationId, LogAggregationReport>();
- this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+ this.logAggregationReportsForApps = logAggregationStatusForApps;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
index dd5a4c8..f46197e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
@@ -192,8 +193,17 @@ public class AppBlock extends HtmlBlock {
: "ApplicationMaster");
if (webUiType != null
&& webUiType.equals(YarnWebParams.RM_WEB_UI)) {
- overviewTable._("Log Aggregation Status",
- root_url("logaggregationstatus", app.getAppId()), "Status");
+ LogAggregationStatus status = getLogAggregationStatus();
+ if (status == null) {
+ overviewTable._("Log Aggregation Status", "N/A");
+ } else if (status == LogAggregationStatus.DISABLED
+ || status == LogAggregationStatus.NOT_START
+ || status == LogAggregationStatus.SUCCEEDED) {
+ overviewTable._("Log Aggregation Status", status.name());
+ } else {
+ overviewTable._("Log Aggregation Status",
+ root_url("logaggregationstatus", app.getAppId()), status.name());
+ }
}
overviewTable._("Diagnostics:",
app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
@@ -342,4 +352,9 @@ public class AppBlock extends HtmlBlock {
protected void createApplicationMetricsTable(Block html) {
}
+
+ // This will be overrided in RMAppBlock
+ protected LogAggregationStatus getLogAggregationStatus() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d34c9f7..c027ac0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -50,19 +50,13 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
- repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
-}
-
-message LogAggregationReportsForAppsProto {
- optional ApplicationIdProto appId = 1;
- optional LogAggregationReportProto log_aggregation_report = 2;
+ repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
}
message LogAggregationReportProto {
-optional ApplicationIdProto application_id = 1;
-optional NodeIdProto node_id = 2;
-optional LogAggregationStatusProto log_aggregation_status = 3;
-optional string diagnostics = 4 [default = "N/A"];
+ optional ApplicationIdProto application_id = 1;
+ optional LogAggregationStatusProto log_aggregation_status = 2;
+ optional string diagnostics = 3 [default = "N/A"];
}
message NodeHeartbeatResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 0eb7ff4..8046228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@@ -666,7 +665,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
- Map<ApplicationId, LogAggregationReport> logAggregationReports =
+ List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
@@ -810,47 +809,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
statusUpdater.start();
}
- private Map<ApplicationId, LogAggregationReport>
- getLogAggregationReportsForApps(
- ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
- Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
- new HashMap<ApplicationId, LogAggregationReport>();
+ private List<LogAggregationReport> getLogAggregationReportsForApps(
+ ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
LogAggregationReport status;
while ((status = lastestLogAggregationStatus.poll()) != null) {
this.logAggregationReportForAppsTempList.add(status);
}
- for (LogAggregationReport logAggregationReport
- : this.logAggregationReportForAppsTempList) {
- LogAggregationReport report = null;
- if (latestLogAggregationReports.containsKey(logAggregationReport
- .getApplicationId())) {
- report =
- latestLogAggregationReports.get(logAggregationReport
- .getApplicationId());
- report.setLogAggregationStatus(logAggregationReport
- .getLogAggregationStatus());
- String message = report.getDiagnosticMessage();
- if (logAggregationReport.getDiagnosticMessage() != null
- && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
- if (message != null) {
- message += logAggregationReport.getDiagnosticMessage();
- } else {
- message = logAggregationReport.getDiagnosticMessage();
- }
- report.setDiagnosticMessage(message);
- }
- } else {
- report = Records.newRecord(LogAggregationReport.class);
- report.setApplicationId(logAggregationReport.getApplicationId());
- report.setNodeId(this.nodeId);
- report.setLogAggregationStatus(logAggregationReport
- .getLogAggregationStatus());
- report
- .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
- }
- latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
- report);
- }
- return latestLogAggregationReports;
+ List<LogAggregationReport> reports = new ArrayList<LogAggregationReport>();
+ reports.addAll(logAggregationReportForAppsTempList);
+ return reports;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 3111f10..dd2ab25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -306,6 +306,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
+ currentTime);
String diagnosticMessage = "";
+ boolean logAggregationSucceedInThisCycle = true;
final boolean rename = uploadedLogsInThisCycle;
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -338,20 +339,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
+ LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(currentTime) + "\n";
renameTemporaryLogFileFailed = true;
+ logAggregationSucceedInThisCycle = false;
}
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
- report.setNodeId(nodeId);
report.setDiagnosticMessage(diagnosticMessage);
+ report.setLogAggregationStatus(logAggregationSucceedInThisCycle
+ ? LogAggregationStatus.RUNNING
+ : LogAggregationStatus.RUNNING_WITH_FAILURE);
+ this.context.getLogAggregationStatusForApps().add(report);
if (appFinished) {
- report.setLogAggregationStatus(renameTemporaryLogFileFailed
+ // If the app is finished, one extra final report with log aggregation
+ // status SUCCEEDED/FAILED will be sent to RM to inform the RM
+ // that the log aggregation in this NM is completed.
+ LogAggregationReport finalReport =
+ Records.newRecord(LogAggregationReport.class);
+ finalReport.setApplicationId(appId);
+ finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
- } else {
- report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
+ this.context.getLogAggregationStatusForApps().add(report);
}
- this.context.getLogAggregationStatusForApps().add(report);
} finally {
if (writer != null) {
writer.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 8abc478..f3dacd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -22,12 +22,15 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -152,6 +156,13 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
new HashMap<NodeId, LogAggregationReport>();
private LogAggregationStatus logAggregationStatusForAppReport;
+ private int logAggregationSucceed = 0;
+ private int logAggregationFailed = 0;
+ private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
+ new HashMap<NodeId, List<String>>();
+ private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
+ new HashMap<NodeId, List<String>>();
+ private final int maxLogAggregationDiagnosticsInMemory;
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
@@ -437,6 +448,14 @@ public class RMAppImpl implements RMApp, Recoverable {
this.logAggregationEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ if (this.logAggregationEnabled) {
+ this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
+ } else {
+ this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
+ }
+ maxLogAggregationDiagnosticsInMemory = conf.getInt(
+ YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+ YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
}
@Override
@@ -834,10 +853,9 @@ public class RMAppImpl implements RMApp, Recoverable {
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
- LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
- .getNodeId(), app.logAggregationEnabled
- ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
- ""));
+ LogAggregationReport.newInstance(app.applicationId,
+ app.logAggregationEnabled ? LogAggregationStatus.NOT_START
+ : LogAggregationStatus.DISABLED, ""));
}
};
}
@@ -1401,18 +1419,20 @@ public class RMAppImpl implements RMApp, Recoverable {
Map<NodeId, LogAggregationReport> outputs =
new HashMap<NodeId, LogAggregationReport>();
outputs.putAll(logAggregationStatus);
- for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
- if (!output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.TIME_OUT)
- && !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.SUCCEEDED)
- && !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.FAILED)
- && isAppInFinalState(this)
- && System.currentTimeMillis() > this.logAggregationStartTime
- + this.logAggregationStatusTimeout) {
- output.getValue().setLogAggregationStatus(
- LogAggregationStatus.TIME_OUT);
+ if (!isLogAggregationFinished()) {
+ for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+ if (!output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.TIME_OUT)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.SUCCEEDED)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.FAILED)
+ && isAppInFinalState(this)
+ && System.currentTimeMillis() > this.logAggregationStartTime
+ + this.logAggregationStatusTimeout) {
+ output.getValue().setLogAggregationStatus(
+ LogAggregationStatus.TIME_OUT);
+ }
}
}
return outputs;
@@ -1424,32 +1444,46 @@ public class RMAppImpl implements RMApp, Recoverable {
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
try {
this.writeLock.lock();
- if (this.logAggregationEnabled) {
+ if (this.logAggregationEnabled && !isLogAggregationFinished()) {
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+ boolean stateChangedToFinal = false;
if (curReport == null) {
this.logAggregationStatus.put(nodeId, report);
+ if (isLogAggregationFinishedForNM(report)) {
+ stateChangedToFinal = true;
+ }
} else {
- if (curReport.getLogAggregationStatus().equals(
- LogAggregationStatus.TIME_OUT)) {
- if (report.getLogAggregationStatus().equals(
- LogAggregationStatus.SUCCEEDED)
- || report.getLogAggregationStatus().equals(
- LogAggregationStatus.FAILED)) {
- curReport.setLogAggregationStatus(report
- .getLogAggregationStatus());
+ if (isLogAggregationFinishedForNM(report)) {
+ if (!isLogAggregationFinishedForNM(curReport)) {
+ stateChangedToFinal = true;
}
- } else {
- curReport.setLogAggregationStatus(report.getLogAggregationStatus());
}
-
- if (report.getDiagnosticMessage() != null
- && !report.getDiagnosticMessage().isEmpty()) {
- curReport
- .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
- ? report.getDiagnosticMessage() : curReport
- .getDiagnosticMessage() + report.getDiagnosticMessage());
+ if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
+ || curReport.getLogAggregationStatus() !=
+ LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ if (curReport.getLogAggregationStatus()
+ == LogAggregationStatus.TIME_OUT
+ && report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING) {
+ // If the log aggregation status got from latest nm heartbeat
+ // is Running, and current log aggregation status is TimeOut,
+ // based on whether there are any failure messages for this NM,
+ // we will reset the log aggregation status as RUNNING or
+ // RUNNING_WITH_FAILURE
+ if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
+ !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
+ report.setLogAggregationStatus(
+ LogAggregationStatus.RUNNING_WITH_FAILURE);
+ }
+ }
+ curReport.setLogAggregationStatus(report
+ .getLogAggregationStatus());
}
}
+ updateLogAggregationDiagnosticMessages(nodeId, report);
+ if (isAppInFinalState(this) && stateChangedToFinal) {
+ updateLogAggregationStatus(nodeId);
+ }
}
} finally {
this.writeLock.unlock();
@@ -1458,29 +1492,32 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
- if (!logAggregationEnabled) {
- return LogAggregationStatus.DISABLED;
- }
- if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
- || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
- return this.logAggregationStatusForAppReport;
- }
try {
this.readLock.lock();
+ if (! logAggregationEnabled) {
+ return LogAggregationStatus.DISABLED;
+ }
+ if (isLogAggregationFinished()) {
+ return this.logAggregationStatusForAppReport;
+ }
Map<NodeId, LogAggregationReport> reports =
getLogAggregationReportsForApp();
if (reports.size() == 0) {
- return null;
+ return this.logAggregationStatusForAppReport;
}
int logNotStartCount = 0;
int logCompletedCount = 0;
int logTimeOutCount = 0;
int logFailedCount = 0;
+ int logRunningWithFailure = 0;
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
switch (report.getValue().getLogAggregationStatus()) {
case NOT_START:
logNotStartCount++;
break;
+ case RUNNING_WITH_FAILURE:
+ logRunningWithFailure ++;
+ break;
case SUCCEEDED:
logCompletedCount++;
break;
@@ -1506,19 +1543,122 @@ public class RMAppImpl implements RMApp, Recoverable {
// the log aggregation is finished. And the log aggregation status will
// not be updated anymore.
if (logFailedCount > 0 && isAppInFinalState(this)) {
- this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
return LogAggregationStatus.FAILED;
} else if (logTimeOutCount > 0) {
return LogAggregationStatus.TIME_OUT;
}
if (isAppInFinalState(this)) {
- this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
return LogAggregationStatus.SUCCEEDED;
}
+ } else if (logRunningWithFailure > 0) {
+ return LogAggregationStatus.RUNNING_WITH_FAILURE;
}
return LogAggregationStatus.RUNNING;
} finally {
this.readLock.unlock();
}
}
+
+ private boolean isLogAggregationFinished() {
+ return this.logAggregationStatusForAppReport
+ .equals(LogAggregationStatus.SUCCEEDED)
+ || this.logAggregationStatusForAppReport
+ .equals(LogAggregationStatus.FAILED);
+
+ }
+
+ private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
+ return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
+ || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
+ }
+
+ private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
+ LogAggregationReport report) {
+ if (report.getDiagnosticMessage() != null
+ && !report.getDiagnosticMessage().isEmpty()) {
+ if (report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING ) {
+ List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
+ if (diagnostics == null) {
+ diagnostics = new ArrayList<String>();
+ logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
+ } else {
+ if (diagnostics.size()
+ == maxLogAggregationDiagnosticsInMemory) {
+ diagnostics.remove(0);
+ }
+ }
+ diagnostics.add(report.getDiagnosticMessage());
+ this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
+ StringUtils.join(diagnostics, "\n"));
+ } else if (report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ List<String> failureMessages =
+ logAggregationFailureMessagesForNMs.get(nodeId);
+ if (failureMessages == null) {
+ failureMessages = new ArrayList<String>();
+ logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
+ } else {
+ if (failureMessages.size()
+ == maxLogAggregationDiagnosticsInMemory) {
+ failureMessages.remove(0);
+ }
+ }
+ failureMessages.add(report.getDiagnosticMessage());
+ }
+ }
+ }
+
+ private void updateLogAggregationStatus(NodeId nodeId) {
+ LogAggregationStatus status =
+ this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+ if (status.equals(LogAggregationStatus.SUCCEEDED)) {
+ this.logAggregationSucceed++;
+ } else if (status.equals(LogAggregationStatus.FAILED)) {
+ this.logAggregationFailed++;
+ }
+ if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
+ this.logAggregationStatusForAppReport =
+ LogAggregationStatus.SUCCEEDED;
+ // Since the log aggregation status for this application for all NMs
+ // is SUCCEEDED, it means all logs are aggregated successfully.
+ // We could remove all the cached log aggregation reports
+ this.logAggregationStatus.clear();
+ this.logAggregationDiagnosticsForNMs.clear();
+ this.logAggregationFailureMessagesForNMs.clear();
+ } else if (this.logAggregationSucceed + this.logAggregationFailed
+ == this.logAggregationStatus.size()) {
+ this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
+ // We have collected the log aggregation status for all NMs.
+ // The log aggregation status is FAILED which means the log
+ // aggregation fails in some NMs. We are only interested in the
+ // nodes where the log aggregation is failed. So we could remove
+ // the log aggregation details for those succeeded NMs
+ for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
+ this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<NodeId, LogAggregationReport> entry = it.next();
+ if (entry.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.SUCCEEDED)) {
+ it.remove();
+ }
+ }
+ // the log aggregation has finished/failed.
+ // and the status will not be updated anymore.
+ this.logAggregationDiagnosticsForNMs.clear();
+ }
+ }
+
+ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+ try {
+ this.readLock.lock();
+ List<String> failureMessages =
+ this.logAggregationFailureMessagesForNMs.get(nodeId);
+ if (failureMessages == null || failureMessages.isEmpty()) {
+ return StringUtils.EMPTY;
+ }
+ return StringUtils.join(failureMessages, "\n");
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3be1867..a11aacf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -777,7 +775,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.handleContainerStatus(statusEvent.getContainers());
- Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+ List<LogAggregationReport> logAggregationReportsForApps =
statusEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) {
@@ -915,12 +913,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
private void handleLogAggregationStatus(
- Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
- for (Entry<ApplicationId, LogAggregationReport> report :
- logAggregationReportsForApps.entrySet()) {
- RMApp rmApp = this.context.getRMApps().get(report.getKey());
+ List<LogAggregationReport> logAggregationReportsForApps) {
+ for (LogAggregationReport report : logAggregationReportsForApps) {
+ RMApp rmApp = this.context.getRMApps().get(report.getApplicationId());
if (rmApp != null) {
- ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+ ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index 4bbf610..b95d7d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -34,7 +32,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
private final List<ContainerStatus> containersCollection;
private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
- private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
+ private List<LogAggregationReport> logAggregationReportsForApps;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -50,7 +48,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
NodeHeartbeatResponse latestResponse,
- Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+ List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
@@ -75,13 +73,12 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.keepAliveAppIds;
}
- public Map<ApplicationId, LogAggregationReport>
- getLogAggregationReportsForApps() {
+ public List<LogAggregationReport> getLogAggregationReportsForApps() {
return this.logAggregationReportsForApps;
}
public void setLogAggregationReportsForApps(
- Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+ List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
index 43e26be..38e0e3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.AppBlock;
-import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -170,4 +170,13 @@ public class RMAppBlock extends AppBlock{
tbody._()._();
}
+
+ @Override
+ protected LogAggregationStatus getLogAggregationStatus() {
+ RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID);
+ if (rmApp == null) {
+ return null;
+ }
+ return rmApp.getLogAggregationStatusForAppReport();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
index a2f61e3..f7f7c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -93,6 +94,9 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
.td("Log Aggregation does not Start.")._();
table_description.tr().td(LogAggregationStatus.RUNNING.name())
.td("Log Aggregation is Running.")._();
+ table_description.tr().td(LogAggregationStatus.RUNNING_WITH_FAILURE.name())
+ .td("Log Aggregation is Running, but has failures "
+ + "in previous cycles")._();
table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
.td("Log Aggregation is Succeeded. All of the logs have been "
+ "aggregated successfully.")._();
@@ -106,24 +110,29 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
table_description._();
div_description._();
- boolean logAggregationEnabled =
- conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
// Application Log aggregation status Table
DIV<Hamlet> div = html.div(_INFO_WRAP);
TABLE<DIV<Hamlet>> table =
div.h3(
"Log Aggregation: "
- + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+ + (rmApp == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport().name())).table(
"#LogAggregationStatus");
- table.
- tr().
- th(_TH, "NodeId").
- th(_TH, "Log Aggregation Status").
- th(_TH, "Diagnostis Message").
- _();
- RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+ int maxLogAggregationDiagnosticsInMemory = conf.getInt(
+ YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+ YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+ table
+ .tr()
+ .th(_TH, "NodeId")
+ .th(_TH, "Log Aggregation Status")
+ .th(_TH, "Last "
+ + maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages")
+ .th(_TH, "Last "
+ + maxLogAggregationDiagnosticsInMemory + " Failure Messages")._();
+
if (rmApp != null) {
Map<NodeId, LogAggregationReport> logAggregationReports =
rmApp.getLogAggregationReportsForApp();
@@ -136,10 +145,14 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
String message =
report.getValue() == null ? null : report.getValue()
.getDiagnosticMessage();
+ String failureMessage =
+ report.getValue() == null ? null : ((RMAppImpl)rmApp)
+ .getLogAggregationFailureMessagesForNM(report.getKey());
table.tr()
.td(report.getKey().toString())
.td(status == null ? "N/A" : status.toString())
- .td(message == null ? "N/A" : message)._();
+ .td(message == null ? "N/A" : message)
+ .td(failureMessage == null ? "N/A" : failureMessage)._();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc13c7d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 4eec63f..9af4290 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -23,7 +23,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -155,26 +155,26 @@ public class TestRMAppLogAggregationStatus {
.getLogAggregationStatus());
}
- Map<ApplicationId, LogAggregationReport> node1ReportForApp =
- new HashMap<ApplicationId, LogAggregationReport>();
+ List<LogAggregationReport> node1ReportForApp =
+ new ArrayList<LogAggregationReport>();
String messageForNode1_1 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.RUNNING, messageForNode1_1);
- node1ReportForApp.put(appId, report1);
+ LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
+ messageForNode1_1);
+ node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp));
- Map<ApplicationId, LogAggregationReport> node2ReportForApp =
- new HashMap<ApplicationId, LogAggregationReport>();
+ List<LogAggregationReport> node2ReportForApp =
+ new ArrayList<LogAggregationReport>();
String messageForNode2_1 =
"node2 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report2 =
- LogAggregationReport.newInstance(appId, nodeId2,
+ LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode2_1);
- node2ReportForApp.put(appId, report2);
+ node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node2ReportForApp));
@@ -205,14 +205,14 @@ public class TestRMAppLogAggregationStatus {
}
// node1 updates its log aggregation status again
- Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
- new HashMap<ApplicationId, LogAggregationReport>();
+ List<LogAggregationReport> node1ReportForApp2 =
+ new ArrayList<LogAggregationReport>();
String messageForNode1_2 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1_2 =
- LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2);
- node1ReportForApp2.put(appId, report1_2);
+ node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp2));
@@ -230,8 +230,9 @@ public class TestRMAppLogAggregationStatus {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
- .getValue().getDiagnosticMessage());
+ Assert.assertEquals(
+ messageForNode1_1 + "\n" + messageForNode1_2, report
+ .getValue().getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
@@ -268,15 +269,19 @@ public class TestRMAppLogAggregationStatus {
// Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to SUCCEEDED
- Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
- new HashMap<ApplicationId, LogAggregationReport>();
- String messageForNode1_3 =
- "node1 final logAggregation status updated at "
- + System.currentTimeMillis();
- LogAggregationReport report1_3 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.SUCCEEDED, messageForNode1_3);
- node1ReportForApp3.put(appId, report1_3);
+ List<LogAggregationReport> node1ReportForApp3 =
+ new ArrayList<LogAggregationReport>();
+ LogAggregationReport report1_3;
+ for (int i = 0; i < 10 ; i ++) {
+ report1_3 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING, "test_message_" + i);
+ node1ReportForApp3.add(report1_3);
+ }
+ node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.SUCCEEDED, ""));
+ // For every logAggregationReport cached in memory, we can only save at most
+ // 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp3));
@@ -290,8 +295,14 @@ public class TestRMAppLogAggregationStatus {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2
- + messageForNode1_3, report.getValue().getDiagnosticMessage());
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 9; i ++) {
+ builder.append("test_message_" + i);
+ builder.append("\n");
+ }
+ builder.append("test_message_" + 9);
+ Assert.assertEquals(builder.toString(), report.getValue()
+ .getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus());
@@ -301,6 +312,32 @@ public class TestRMAppLogAggregationStatus {
.fail("should not contain log aggregation report for other nodes");
}
}
+
+ // update log aggregationStatus for node2 as FAILED,
+ // so the log aggregation status for the App will become FAILED,
+ // and we only keep the log aggregation reports whose status is FAILED,
+ // so the log aggregation report for node1 will be removed.
+ List<LogAggregationReport> node2ReportForApp2 =
+ new ArrayList<LogAggregationReport>();
+ LogAggregationReport report2_2 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
+ LogAggregationReport report2_3 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.FAILED, "");
+ node2ReportForApp2.add(report2_2);
+ node2ReportForApp2.add(report2_3);
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+ null, node2ReportForApp2));
+ Assert.assertEquals(LogAggregationStatus.FAILED,
+ rmApp.getLogAggregationStatusForAppReport());
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertTrue(logAggregationStatus.size() == 1);
+ Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
+ Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
+ Assert.assertEquals("Fail_Message",
+ ((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
}
@Test (timeout = 10000)
@@ -317,9 +354,11 @@ public class TestRMAppLogAggregationStatus {
// Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf);
- // If we do not know any NodeManagers for this application ,
- // the log aggregation status will return null
- Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
+ // If we do not know any NodeManagers for this application , and
+ // the log aggregation is enabled, the log aggregation status will
+ // return NOT_START
+ Assert.assertEquals(LogAggregationStatus.NOT_START,
+ rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@@ -329,24 +368,24 @@ public class TestRMAppLogAggregationStatus {
// If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
@@ -357,13 +396,13 @@ public class TestRMAppLogAggregationStatus {
// others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport());
@@ -371,17 +410,59 @@ public class TestRMAppLogAggregationStatus {
// is at the final state, the log aggregation status for this app will
// return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
+ // If the log aggregation status for at least one of NMs are RUNNING,
+ // the log aggregation status for this app will return RUNNING
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING,
+ rmApp.getLogAggregationStatusForAppReport());
+
+ // If the log aggregation status for at least one of NMs
+ // are RUNNING_WITH_FAILURE, the log aggregation status
+ // for this app will return RUNNING_WITH_FAILURE
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
+ ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+ rmApp.getLogAggregationStatusForAppReport());
+
+ // For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
+ // it will not be changed even it get a new log aggregation status
+ // as RUNNING
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+ rmApp.getLogAggregationStatusForAppReport());
+
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED,
@@ -389,13 +470,13 @@ public class TestRMAppLogAggregationStatus {
// at the final state, the log aggregation status for this app
// will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());