You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/08/16 10:16:13 UTC
[hadoop] branch trunk updated: YARN-8586. Extract log aggregation
related fields and methods from RMAppImpl. Contributed by Peter Bacsko
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4456ea6 YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
4456ea6 is described below
commit 4456ea67b949553b85e101e866b4b3f4b335f1f0
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Aug 16 11:36:14 2019 +0200
YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
---
.../server/resourcemanager/rmapp/RMAppImpl.java | 315 ++---------------
.../resourcemanager/rmapp/RMAppLogAggregation.java | 383 +++++++++++++++++++++
2 files changed, 406 insertions(+), 292 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 3f9f9c8..d25dddc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -19,24 +19,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -182,19 +177,7 @@ public class RMAppImpl implements RMApp, Recoverable {
new AppFinishedTransition();
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
- private final boolean logAggregationEnabled;
- private long logAggregationStartTime = 0;
- private final long logAggregationStatusTimeout;
- private final Map<NodeId, LogAggregationReport> logAggregationStatus =
- new ConcurrentHashMap<NodeId, LogAggregationReport>();
- private volatile LogAggregationStatus logAggregationStatusForAppReport;
- private int logAggregationSucceed = 0;
- private int logAggregationFailed = 0;
- private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
- new HashMap<NodeId, List<String>>();
- private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
- new HashMap<NodeId, List<String>>();
- private final int maxLogAggregationDiagnosticsInMemory;
+ private final RMAppLogAggregation logAggregation;
private Map<ApplicationTimeoutType, Long> applicationTimeouts =
new HashMap<ApplicationTimeoutType, Long>();
@@ -511,26 +494,7 @@ public class RMAppImpl implements RMApp, Recoverable {
applicationSchedulingEnvs
.putAll(submissionContext.getApplicationSchedulingPropertiesMap());
- long localLogAggregationStatusTimeout =
- conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
- if (localLogAggregationStatusTimeout <= 0) {
- this.logAggregationStatusTimeout =
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
- } else {
- this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
- }
- this.logAggregationEnabled =
- conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
- if (this.logAggregationEnabled) {
- this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
- } else {
- this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
- }
- maxLogAggregationDiagnosticsInMemory = conf.getInt(
- YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
- YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+ this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);
// amBlacklistingEnabled can be configured globally
// Just use the global values
@@ -1090,13 +1054,9 @@ public class RMAppImpl implements RMApp, Recoverable {
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
- if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
- app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
- LogAggregationReport.newInstance(app.applicationId,
- app.logAggregationEnabled ? LogAggregationStatus.NOT_START
- : LogAggregationStatus.DISABLED, ""));
- }
- };
+ app.logAggregation.addReportIfNecessary(
+ nodeAddedEvent.getNodeId(), app.getApplicationId());
+ }
}
// synchronously recover attempt to ensure any incoming external events
@@ -1530,13 +1490,13 @@ public class RMAppImpl implements RMApp, Recoverable {
finalState));
}
- // Send app completed event to AppManager
app.handler.handle(new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
}
private void handleAppFinished(RMAppImpl app) {
- app.logAggregationStartTime = app.systemClock.getTime();
+ app.logAggregation
+ .recordLogAggregationStartTime(app.systemClock.getTime());
// record finish time
app.finishTime = app.storedFinishTime;
if (app.finishTime == 0) {
@@ -1778,263 +1738,31 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
- this.readLock.lock();
- try {
- if (!isLogAggregationFinished() && isAppInFinalState(this) &&
- systemClock.getTime() > this.logAggregationStartTime
- + this.logAggregationStatusTimeout) {
- for (Entry<NodeId, LogAggregationReport> output :
- logAggregationStatus.entrySet()) {
- if (!output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.TIME_OUT)
- && !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.SUCCEEDED)
- && !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.FAILED)) {
- output.getValue().setLogAggregationStatus(
- LogAggregationStatus.TIME_OUT);
- }
- }
- }
- return Collections.unmodifiableMap(logAggregationStatus);
- } finally {
- this.readLock.unlock();
- }
+ return logAggregation.getLogAggregationReportsForApp(this);
}
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
- this.writeLock.lock();
- try {
- if (this.logAggregationEnabled && !isLogAggregationFinished()) {
- LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
- boolean stateChangedToFinal = false;
- if (curReport == null) {
- this.logAggregationStatus.put(nodeId, report);
- if (isLogAggregationFinishedForNM(report)) {
- stateChangedToFinal = true;
- }
- } else {
- if (isLogAggregationFinishedForNM(report)) {
- if (!isLogAggregationFinishedForNM(curReport)) {
- stateChangedToFinal = true;
- }
- }
- if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
- || curReport.getLogAggregationStatus() !=
- LogAggregationStatus.RUNNING_WITH_FAILURE) {
- if (curReport.getLogAggregationStatus()
- == LogAggregationStatus.TIME_OUT
- && report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING) {
- // If the log aggregation status got from latest NM heartbeat
- // is RUNNING, and current log aggregation status is TIME_OUT,
- // based on whether there are any failure messages for this NM,
- // we will reset the log aggregation status as RUNNING or
- // RUNNING_WITH_FAILURE
- if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
- !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
- report.setLogAggregationStatus(
- LogAggregationStatus.RUNNING_WITH_FAILURE);
- }
- }
- curReport.setLogAggregationStatus(report
- .getLogAggregationStatus());
- }
- }
- updateLogAggregationDiagnosticMessages(nodeId, report);
- if (isAppInFinalState(this) && stateChangedToFinal) {
- updateLogAggregationStatus(nodeId);
- }
- }
- } finally {
- this.writeLock.unlock();
- }
+ logAggregation.aggregateLogReport(nodeId, report, this);
}
@Override
- public LogAggregationStatus getLogAggregationStatusForAppReport() {
- this.readLock.lock();
- try {
- if (! logAggregationEnabled) {
- return LogAggregationStatus.DISABLED;
- }
- if (isLogAggregationFinished()) {
- return this.logAggregationStatusForAppReport;
- }
- Map<NodeId, LogAggregationReport> reports =
- getLogAggregationReportsForApp();
- if (reports.size() == 0) {
- return this.logAggregationStatusForAppReport;
- }
- int logNotStartCount = 0;
- int logCompletedCount = 0;
- int logTimeOutCount = 0;
- int logFailedCount = 0;
- int logRunningWithFailure = 0;
- for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
- switch (report.getValue().getLogAggregationStatus()) {
- case NOT_START:
- logNotStartCount++;
- break;
- case RUNNING_WITH_FAILURE:
- logRunningWithFailure ++;
- break;
- case SUCCEEDED:
- logCompletedCount++;
- break;
- case FAILED:
- logFailedCount++;
- logCompletedCount++;
- break;
- case TIME_OUT:
- logTimeOutCount++;
- logCompletedCount++;
- break;
- default:
- break;
- }
- }
- if (logNotStartCount == reports.size()) {
- return LogAggregationStatus.NOT_START;
- } else if (logCompletedCount == reports.size()) {
- // We should satisfy two condition in order to return SUCCEEDED or FAILED
- // 1) make sure the application is in final state
- // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
- // The SUCCEEDED/FAILED status is the final status which means
- // the log aggregation is finished. And the log aggregation status will
- // not be updated anymore.
- if (logFailedCount > 0 && isAppInFinalState(this)) {
- this.logAggregationStatusForAppReport =
- LogAggregationStatus.FAILED;
- return LogAggregationStatus.FAILED;
- } else if (logTimeOutCount > 0) {
- this.logAggregationStatusForAppReport =
- LogAggregationStatus.TIME_OUT;
- return LogAggregationStatus.TIME_OUT;
- }
- if (isAppInFinalState(this)) {
- this.logAggregationStatusForAppReport =
- LogAggregationStatus.SUCCEEDED;
- return LogAggregationStatus.SUCCEEDED;
- }
- } else if (logRunningWithFailure > 0) {
- return LogAggregationStatus.RUNNING_WITH_FAILURE;
- }
- return LogAggregationStatus.RUNNING;
- } finally {
- this.readLock.unlock();
- }
+ public boolean isLogAggregationFinished() {
+ return logAggregation.isFinished();
}
@Override
public boolean isLogAggregationEnabled() {
- return logAggregationEnabled;
+ return logAggregation.isEnabled();
}
- @Override
- public boolean isLogAggregationFinished() {
- return this.logAggregationStatusForAppReport
- .equals(LogAggregationStatus.SUCCEEDED)
- || this.logAggregationStatusForAppReport
- .equals(LogAggregationStatus.FAILED)
- || this.logAggregationStatusForAppReport
- .equals(LogAggregationStatus.TIME_OUT);
-
- }
-
- private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
- return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
- || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
- }
-
- private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
- LogAggregationReport report) {
- if (report.getDiagnosticMessage() != null
- && !report.getDiagnosticMessage().isEmpty()) {
- if (report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING ) {
- List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
- if (diagnostics == null) {
- diagnostics = new ArrayList<String>();
- logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
- } else {
- if (diagnostics.size()
- == maxLogAggregationDiagnosticsInMemory) {
- diagnostics.remove(0);
- }
- }
- diagnostics.add(report.getDiagnosticMessage());
- this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
- StringUtils.join(diagnostics, "\n"));
- } else if (report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING_WITH_FAILURE) {
- List<String> failureMessages =
- logAggregationFailureMessagesForNMs.get(nodeId);
- if (failureMessages == null) {
- failureMessages = new ArrayList<String>();
- logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
- } else {
- if (failureMessages.size()
- == maxLogAggregationDiagnosticsInMemory) {
- failureMessages.remove(0);
- }
- }
- failureMessages.add(report.getDiagnosticMessage());
- }
- }
- }
-
- private void updateLogAggregationStatus(NodeId nodeId) {
- LogAggregationStatus status =
- this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
- if (status.equals(LogAggregationStatus.SUCCEEDED)) {
- this.logAggregationSucceed++;
- } else if (status.equals(LogAggregationStatus.FAILED)) {
- this.logAggregationFailed++;
- }
- if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
- this.logAggregationStatusForAppReport =
- LogAggregationStatus.SUCCEEDED;
- // Since the log aggregation status for this application for all NMs
- // is SUCCEEDED, it means all logs are aggregated successfully.
- // We could remove all the cached log aggregation reports
- this.logAggregationStatus.clear();
- this.logAggregationDiagnosticsForNMs.clear();
- this.logAggregationFailureMessagesForNMs.clear();
- } else if (this.logAggregationSucceed + this.logAggregationFailed
- == this.logAggregationStatus.size()) {
- this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
- // We have collected the log aggregation status for all NMs.
- // The log aggregation status is FAILED which means the log
- // aggregation fails in some NMs. We are only interested in the
- // nodes where the log aggregation is failed. So we could remove
- // the log aggregation details for those succeeded NMs
- for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
- this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
- Map.Entry<NodeId, LogAggregationReport> entry = it.next();
- if (entry.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.SUCCEEDED)) {
- it.remove();
- }
- }
- // the log aggregation has finished/failed.
- // and the status will not be updated anymore.
- this.logAggregationDiagnosticsForNMs.clear();
- }
+ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+ return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
}
- public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
- this.readLock.lock();
- try {
- List<String> failureMessages =
- this.logAggregationFailureMessagesForNMs.get(nodeId);
- if (failureMessages == null || failureMessages.isEmpty()) {
- return StringUtils.EMPTY;
- }
- return StringUtils.join(failureMessages, "\n");
- } finally {
- this.readLock.unlock();
- }
+ @Override
+ public LogAggregationStatus getLogAggregationStatusForAppReport() {
+ return logAggregation
+ .getLogAggregationStatusForAppReport(this);
}
@Override
@@ -2153,8 +1881,11 @@ public class RMAppImpl implements RMApp, Recoverable {
}
@VisibleForTesting
- public long getLogAggregationStartTime() {
- return logAggregationStartTime;
+ long getLogAggregationStartTime() {
+ return logAggregation.getLogAggregationStartTime();
}
+ Clock getSystemClock() {
+ return systemClock;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java
new file mode 100644
index 0000000..b4409ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+/**
+ * Log aggregation logic used by RMApp.
+ *
+ */
+public class RMAppLogAggregation {
+ private final boolean logAggregationEnabled;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private long logAggregationStartTime = 0;
+ private final long logAggregationStatusTimeout;
+ private final Map<NodeId, LogAggregationReport> logAggregationStatus =
+ new ConcurrentHashMap<>();
+ private volatile LogAggregationStatus logAggregationStatusForAppReport;
+ private int logAggregationSucceed = 0;
+ private int logAggregationFailed = 0;
+ private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
+ new HashMap<>();
+ private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
+ new HashMap<>();
+ private final int maxLogAggregationDiagnosticsInMemory;
+
+ RMAppLogAggregation(Configuration conf, ReadLock readLock,
+ WriteLock writeLock) {
+ this.readLock = readLock;
+ this.writeLock = writeLock;
+ this.logAggregationStatusTimeout = getLogAggregationStatusTimeout(conf);
+ this.logAggregationEnabled = getEnabledFlagFromConf(conf);
+ this.logAggregationStatusForAppReport =
+ this.logAggregationEnabled ? LogAggregationStatus.NOT_START :
+ LogAggregationStatus.DISABLED;
+ this.maxLogAggregationDiagnosticsInMemory =
+ getMaxLogAggregationDiagnostics(conf);
+ }
+
+ private long getLogAggregationStatusTimeout(Configuration conf) {
+ long statusTimeout =
+ conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+ if (statusTimeout <= 0) {
+ return YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+ } else {
+ return statusTimeout;
+ }
+ }
+
+ private boolean getEnabledFlagFromConf(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ }
+
+ private int getMaxLogAggregationDiagnostics(Configuration conf) {
+ return conf.getInt(
+ YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+ YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+ }
+
+ Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp(
+ RMAppImpl rmApp) {
+ this.readLock.lock();
+ try {
+ if (!isLogAggregationFinished() && RMAppImpl.isAppInFinalState(rmApp) &&
+ rmApp.getSystemClock().getTime() > this.logAggregationStartTime
+ + this.logAggregationStatusTimeout) {
+ for (Map.Entry<NodeId, LogAggregationReport> output :
+ logAggregationStatus.entrySet()) {
+ if (!output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.TIME_OUT)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.SUCCEEDED)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.FAILED)) {
+ output.getValue().setLogAggregationStatus(
+ LogAggregationStatus.TIME_OUT);
+ }
+ }
+ }
+ return Collections.unmodifiableMap(logAggregationStatus);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ void aggregateLogReport(NodeId nodeId, LogAggregationReport report,
+ RMAppImpl rmApp) {
+ this.writeLock.lock();
+ try {
+ if (this.logAggregationEnabled && !isLogAggregationFinished()) {
+ LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+ boolean stateChangedToFinal = false;
+ if (curReport == null) {
+ this.logAggregationStatus.put(nodeId, report);
+ if (isLogAggregationFinishedForNM(report)) {
+ stateChangedToFinal = true;
+ }
+ } else {
+ if (isLogAggregationFinishedForNM(report)) {
+ if (!isLogAggregationFinishedForNM(curReport)) {
+ stateChangedToFinal = true;
+ }
+ }
+ if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
+ || curReport.getLogAggregationStatus() !=
+ LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ if (curReport.getLogAggregationStatus()
+ == LogAggregationStatus.TIME_OUT
+ && report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING) {
+ // If the log aggregation status got from latest NM heartbeat
+ // is RUNNING, and current log aggregation status is TIME_OUT,
+ // based on whether there are any failure messages for this NM,
+ // we will reset the log aggregation status as RUNNING or
+ // RUNNING_WITH_FAILURE
+ if (isThereFailureMessageForNM(nodeId)) {
+ report.setLogAggregationStatus(
+ LogAggregationStatus.RUNNING_WITH_FAILURE);
+ }
+ }
+ curReport.setLogAggregationStatus(report
+ .getLogAggregationStatus());
+ }
+ }
+ updateLogAggregationDiagnosticMessages(nodeId, report);
+ if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) {
+ updateLogAggregationStatus(nodeId);
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public LogAggregationStatus getLogAggregationStatusForAppReport(
+ RMAppImpl rmApp) {
+ boolean appInFinalState = RMAppImpl.isAppInFinalState(rmApp);
+ this.readLock.lock();
+ try {
+ if (!logAggregationEnabled) {
+ return LogAggregationStatus.DISABLED;
+ }
+ if (isLogAggregationFinished()) {
+ return this.logAggregationStatusForAppReport;
+ }
+ Map<NodeId, LogAggregationReport> reports =
+ getLogAggregationReportsForApp(rmApp);
+ if (reports.size() == 0) {
+ return this.logAggregationStatusForAppReport;
+ }
+ int logNotStartCount = 0;
+ int logCompletedCount = 0;
+ int logTimeOutCount = 0;
+ int logFailedCount = 0;
+ int logRunningWithFailure = 0;
+ for (Map.Entry<NodeId, LogAggregationReport> report :
+ reports.entrySet()) {
+ switch (report.getValue().getLogAggregationStatus()) {
+ case NOT_START:
+ logNotStartCount++;
+ break;
+ case RUNNING_WITH_FAILURE:
+ logRunningWithFailure ++;
+ break;
+ case SUCCEEDED:
+ logCompletedCount++;
+ break;
+ case FAILED:
+ logFailedCount++;
+ logCompletedCount++;
+ break;
+ case TIME_OUT:
+ logTimeOutCount++;
+ logCompletedCount++;
+ break;
+ default:
+ break;
+ }
+ }
+ if (logNotStartCount == reports.size()) {
+ return LogAggregationStatus.NOT_START;
+ } else if (logCompletedCount == reports.size()) {
+ // We should satisfy two condition in order to return
+ // SUCCEEDED or FAILED.
+ // 1) make sure the application is in final state
+ // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
+ // The SUCCEEDED/FAILED status is the final status which means
+ // the log aggregation is finished. And the log aggregation status will
+ // not be updated anymore.
+ if (logFailedCount > 0 && appInFinalState) {
+ this.logAggregationStatusForAppReport =
+ LogAggregationStatus.FAILED;
+ return LogAggregationStatus.FAILED;
+ } else if (logTimeOutCount > 0) {
+ this.logAggregationStatusForAppReport =
+ LogAggregationStatus.TIME_OUT;
+ return LogAggregationStatus.TIME_OUT;
+ }
+ if (appInFinalState) {
+ this.logAggregationStatusForAppReport =
+ LogAggregationStatus.SUCCEEDED;
+ return LogAggregationStatus.SUCCEEDED;
+ }
+ } else if (logRunningWithFailure > 0) {
+ return LogAggregationStatus.RUNNING_WITH_FAILURE;
+ }
+ return LogAggregationStatus.RUNNING;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private boolean isLogAggregationFinished() {
+ return this.logAggregationStatusForAppReport
+ .equals(LogAggregationStatus.SUCCEEDED)
+ || this.logAggregationStatusForAppReport
+ .equals(LogAggregationStatus.FAILED)
+ || this.logAggregationStatusForAppReport
+ .equals(LogAggregationStatus.TIME_OUT);
+
+ }
+
+ private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
+ return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
+ || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
+ }
+
+ private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
+ LogAggregationReport report) {
+ if (report.getDiagnosticMessage() != null
+ && !report.getDiagnosticMessage().isEmpty()) {
+ if (report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING ) {
+ List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
+ if (diagnostics == null) {
+ diagnostics = new ArrayList<>();
+ logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
+ } else {
+ if (diagnostics.size()
+ == maxLogAggregationDiagnosticsInMemory) {
+ diagnostics.remove(0);
+ }
+ }
+ diagnostics.add(report.getDiagnosticMessage());
+ this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
+ StringUtils.join(diagnostics, "\n"));
+ } else if (report.getLogAggregationStatus()
+ == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ List<String> failureMessages =
+ logAggregationFailureMessagesForNMs.get(nodeId);
+ if (failureMessages == null) {
+ failureMessages = new ArrayList<>();
+ logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
+ } else {
+ if (failureMessages.size()
+ == maxLogAggregationDiagnosticsInMemory) {
+ failureMessages.remove(0);
+ }
+ }
+ failureMessages.add(report.getDiagnosticMessage());
+ }
+ }
+ }
+
+ private void updateLogAggregationStatus(NodeId nodeId) {
+ LogAggregationStatus status =
+ this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+ if (status.equals(LogAggregationStatus.SUCCEEDED)) {
+ this.logAggregationSucceed++;
+ } else if (status.equals(LogAggregationStatus.FAILED)) {
+ this.logAggregationFailed++;
+ }
+ if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
+ this.logAggregationStatusForAppReport =
+ LogAggregationStatus.SUCCEEDED;
+ // Since the log aggregation status for this application for all NMs
+ // is SUCCEEDED, it means all logs are aggregated successfully.
+ // We could remove all the cached log aggregation reports
+ this.logAggregationStatus.clear();
+ this.logAggregationDiagnosticsForNMs.clear();
+ this.logAggregationFailureMessagesForNMs.clear();
+ } else if (this.logAggregationSucceed + this.logAggregationFailed
+ == this.logAggregationStatus.size()) {
+ this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
+ // We have collected the log aggregation status for all NMs.
+ // The log aggregation status is FAILED which means the log
+ // aggregation fails in some NMs. We are only interested in the
+ // nodes where the log aggregation is failed. So we could remove
+ // the log aggregation details for those succeeded NMs
+ this.logAggregationStatus.entrySet().removeIf(entry ->
+ entry.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.SUCCEEDED));
+ // the log aggregation has finished/failed.
+ // and the status will not be updated anymore.
+ this.logAggregationDiagnosticsForNMs.clear();
+ }
+ }
+
+ String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+ this.readLock.lock();
+ try {
+ List<String> failureMessages =
+ this.logAggregationFailureMessagesForNMs.get(nodeId);
+ if (failureMessages == null || failureMessages.isEmpty()) {
+ return StringUtils.EMPTY;
+ }
+ return StringUtils.join(failureMessages, "\n");
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ void recordLogAggregationStartTime(long time) {
+ logAggregationStartTime = time;
+ }
+
+ public boolean isEnabled() {
+ return logAggregationEnabled;
+ }
+
+ private boolean hasReportForNodeManager(NodeId nodeId) {
+ return logAggregationStatus.containsKey(nodeId);
+ }
+
+ private void addReportForNodeManager(NodeId nodeId,
+ LogAggregationReport report) {
+ logAggregationStatus.put(nodeId, report);
+ }
+
+ public boolean isFinished() {
+ return isLogAggregationFinished();
+ }
+
+ private boolean isThereFailureMessageForNM(NodeId nodeId) {
+ return logAggregationFailureMessagesForNMs.get(nodeId) != null
+ && !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty();
+ }
+
+ long getLogAggregationStartTime() {
+ return logAggregationStartTime;
+ }
+
+ void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) {
+ if (!hasReportForNodeManager(nodeId)) {
+ LogAggregationStatus status = isEnabled() ? LogAggregationStatus.NOT_START
+ : LogAggregationStatus.DISABLED;
+ addReportForNodeManager(nodeId,
+ LogAggregationReport.newInstance(applicationId, status, ""));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org