You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/08/16 10:15:53 UTC

[hadoop] branch branch-3.2 updated: YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new df61637  YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
df61637 is described below

commit df616370f01494f8a9abfca73465789e16a8a0d8
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Aug 16 11:52:51 2019 +0200

    YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
---
 .../server/resourcemanager/rmapp/RMAppImpl.java    | 315 ++---------------
 .../resourcemanager/rmapp/RMAppLogAggregation.java | 383 +++++++++++++++++++++
 2 files changed, 410 insertions(+), 288 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d22bb9..9d32257 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -19,18 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -181,19 +177,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       new AppFinishedTransition();
   private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
 
-  private final boolean logAggregationEnabled;
-  private long logAggregationStartTime = 0;
-  private final long logAggregationStatusTimeout;
-  private final Map<NodeId, LogAggregationReport> logAggregationStatus =
-      new ConcurrentHashMap<NodeId, LogAggregationReport>();
-  private volatile LogAggregationStatus logAggregationStatusForAppReport;
-  private int logAggregationSucceed = 0;
-  private int logAggregationFailed = 0;
-  private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
-      new HashMap<NodeId, List<String>>();
-  private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
-      new HashMap<NodeId, List<String>>();
-  private final int maxLogAggregationDiagnosticsInMemory;
+  private final RMAppLogAggregation logAggregation;
   private Map<ApplicationTimeoutType, Long> applicationTimeouts =
       new HashMap<ApplicationTimeoutType, Long>();
 
@@ -510,26 +494,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     applicationSchedulingEnvs
         .putAll(submissionContext.getApplicationSchedulingPropertiesMap());
 
-    long localLogAggregationStatusTimeout =
-        conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
-    if (localLogAggregationStatusTimeout <= 0) {
-      this.logAggregationStatusTimeout =
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
-    } else {
-      this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
-    }
-    this.logAggregationEnabled =
-        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
-    if (this.logAggregationEnabled) {
-      this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
-    } else {
-      this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
-    }
-    maxLogAggregationDiagnosticsInMemory = conf.getInt(
-        YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
-        YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+    this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);
 
     // amBlacklistingEnabled can be configured globally
     // Just use the global values
@@ -1087,13 +1052,9 @@ public class RMAppImpl implements RMApp, Recoverable {
       // otherwise, add it to ranNodes for further process
       app.ranNodes.add(nodeAddedEvent.getNodeId());
 
-      if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
-        app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
-          LogAggregationReport.newInstance(app.applicationId,
-            app.logAggregationEnabled ? LogAggregationStatus.NOT_START
-                : LogAggregationStatus.DISABLED, ""));
-      }
-    };
+      app.logAggregation.addReportIfNecessary(
+          nodeAddedEvent.getNodeId(), app.getApplicationId());
+    }
   }
 
   // synchronously recover attempt to ensure any incoming external events
@@ -1507,7 +1468,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
 
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.logAggregationStartTime = app.systemClock.getTime();
+      app.logAggregation
+        .recordLogAggregationStartTime(app.systemClock.getTime());
       for (NodeId nodeId : app.getRanNodes()) {
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1765,263 +1727,31 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   @Override
   public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
-    try {
-      this.readLock.lock();
-      if (!isLogAggregationFinished() && isAppInFinalState(this) &&
-          systemClock.getTime() > this.logAggregationStartTime
-          + this.logAggregationStatusTimeout) {
-        for (Entry<NodeId, LogAggregationReport> output :
-            logAggregationStatus.entrySet()) {
-          if (!output.getValue().getLogAggregationStatus()
-            .equals(LogAggregationStatus.TIME_OUT)
-              && !output.getValue().getLogAggregationStatus()
-                .equals(LogAggregationStatus.SUCCEEDED)
-              && !output.getValue().getLogAggregationStatus()
-                .equals(LogAggregationStatus.FAILED)) {
-            output.getValue().setLogAggregationStatus(
-              LogAggregationStatus.TIME_OUT);
-          }
-        }
-      }
-      return Collections.unmodifiableMap(logAggregationStatus);
-    } finally {
-      this.readLock.unlock();
-    }
+    return logAggregation.getLogAggregationReportsForApp(this);
   }
 
   public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
-    try {
-      this.writeLock.lock();
-      if (this.logAggregationEnabled && !isLogAggregationFinished()) {
-        LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
-        boolean stateChangedToFinal = false;
-        if (curReport == null) {
-          this.logAggregationStatus.put(nodeId, report);
-          if (isLogAggregationFinishedForNM(report)) {
-            stateChangedToFinal = true;
-          }
-        } else {
-          if (isLogAggregationFinishedForNM(report)) {
-            if (!isLogAggregationFinishedForNM(curReport)) {
-              stateChangedToFinal = true;
-            }
-          }
-          if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
-              || curReport.getLogAggregationStatus() !=
-                  LogAggregationStatus.RUNNING_WITH_FAILURE) {
-            if (curReport.getLogAggregationStatus()
-                == LogAggregationStatus.TIME_OUT
-                && report.getLogAggregationStatus()
-                    == LogAggregationStatus.RUNNING) {
-            // If the log aggregation status got from latest nm heartbeat
-            // is Running, and current log aggregation status is TimeOut,
-            // based on whether there are any failure messages for this NM,
-            // we will reset the log aggregation status as RUNNING or
-            // RUNNING_WITH_FAILURE
-              if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
-                  !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
-                report.setLogAggregationStatus(
-                    LogAggregationStatus.RUNNING_WITH_FAILURE);
-              }
-            }
-            curReport.setLogAggregationStatus(report
-              .getLogAggregationStatus());
-          }
-        }
-        updateLogAggregationDiagnosticMessages(nodeId, report);
-        if (isAppInFinalState(this) && stateChangedToFinal) {
-          updateLogAggregationStatus(nodeId);
-        }
-      }
-    } finally {
-      this.writeLock.unlock();
-    }
+    logAggregation.aggregateLogReport(nodeId, report, this);
   }
 
   @Override
-  public LogAggregationStatus getLogAggregationStatusForAppReport() {
-    try {
-      this.readLock.lock();
-      if (! logAggregationEnabled) {
-        return LogAggregationStatus.DISABLED;
-      }
-      if (isLogAggregationFinished()) {
-        return this.logAggregationStatusForAppReport;
-      }
-      Map<NodeId, LogAggregationReport> reports =
-          getLogAggregationReportsForApp();
-      if (reports.size() == 0) {
-        return this.logAggregationStatusForAppReport;
-      }
-      int logNotStartCount = 0;
-      int logCompletedCount = 0;
-      int logTimeOutCount = 0;
-      int logFailedCount = 0;
-      int logRunningWithFailure = 0;
-      for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
-        switch (report.getValue().getLogAggregationStatus()) {
-          case NOT_START:
-            logNotStartCount++;
-            break;
-          case RUNNING_WITH_FAILURE:
-            logRunningWithFailure ++;
-            break;
-          case SUCCEEDED:
-            logCompletedCount++;
-            break;
-          case FAILED:
-            logFailedCount++;
-            logCompletedCount++;
-            break;
-          case TIME_OUT:
-            logTimeOutCount++;
-            logCompletedCount++;
-            break;
-          default:
-            break;
-        }
-      }
-      if (logNotStartCount == reports.size()) {
-        return LogAggregationStatus.NOT_START;
-      } else if (logCompletedCount == reports.size()) {
-        // We should satisfy two condition in order to return SUCCEEDED or FAILED
-        // 1) make sure the application is in final state
-        // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
-        // The SUCCEEDED/FAILED status is the final status which means
-        // the log aggregation is finished. And the log aggregation status will
-        // not be updated anymore.
-        if (logFailedCount > 0 && isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport =
-              LogAggregationStatus.FAILED;
-          return LogAggregationStatus.FAILED;
-        } else if (logTimeOutCount > 0) {
-          this.logAggregationStatusForAppReport =
-              LogAggregationStatus.TIME_OUT;
-          return LogAggregationStatus.TIME_OUT;
-        }
-        if (isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport =
-              LogAggregationStatus.SUCCEEDED;
-          return LogAggregationStatus.SUCCEEDED;
-        }
-      } else if (logRunningWithFailure > 0) {
-        return LogAggregationStatus.RUNNING_WITH_FAILURE;
-      }
-      return LogAggregationStatus.RUNNING;
-    } finally {
-      this.readLock.unlock();
-    }
+  public boolean isLogAggregationFinished() {
+    return logAggregation.isFinished();
   }
 
   @Override
   public boolean isLogAggregationEnabled() {
-    return logAggregationEnabled;
-  }
-
-  @Override
-  public boolean isLogAggregationFinished() {
-    return this.logAggregationStatusForAppReport
-      .equals(LogAggregationStatus.SUCCEEDED)
-        || this.logAggregationStatusForAppReport
-          .equals(LogAggregationStatus.FAILED)
-        || this.logAggregationStatusForAppReport
-          .equals(LogAggregationStatus.TIME_OUT);
-
-  }
-
-  private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
-    return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
-        || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
-  }
-
-  private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
-      LogAggregationReport report) {
-    if (report.getDiagnosticMessage() != null
-        && !report.getDiagnosticMessage().isEmpty()) {
-      if (report.getLogAggregationStatus()
-          == LogAggregationStatus.RUNNING ) {
-        List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
-        if (diagnostics == null) {
-          diagnostics = new ArrayList<String>();
-          logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
-        } else {
-          if (diagnostics.size()
-              == maxLogAggregationDiagnosticsInMemory) {
-            diagnostics.remove(0);
-          }
-        }
-        diagnostics.add(report.getDiagnosticMessage());
-        this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
-          StringUtils.join(diagnostics, "\n"));
-      } else if (report.getLogAggregationStatus()
-          == LogAggregationStatus.RUNNING_WITH_FAILURE) {
-        List<String> failureMessages =
-            logAggregationFailureMessagesForNMs.get(nodeId);
-        if (failureMessages == null) {
-          failureMessages = new ArrayList<String>();
-          logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
-        } else {
-          if (failureMessages.size()
-              == maxLogAggregationDiagnosticsInMemory) {
-            failureMessages.remove(0);
-          }
-        }
-        failureMessages.add(report.getDiagnosticMessage());
-      }
-    }
+    return logAggregation.isEnabled();
   }
 
-  private void updateLogAggregationStatus(NodeId nodeId) {
-    LogAggregationStatus status =
-        this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
-    if (status.equals(LogAggregationStatus.SUCCEEDED)) {
-      this.logAggregationSucceed++;
-    } else if (status.equals(LogAggregationStatus.FAILED)) {
-      this.logAggregationFailed++;
-    }
-    if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
-      this.logAggregationStatusForAppReport =
-          LogAggregationStatus.SUCCEEDED;
-      // Since the log aggregation status for this application for all NMs
-      // is SUCCEEDED, it means all logs are aggregated successfully.
-      // We could remove all the cached log aggregation reports
-      this.logAggregationStatus.clear();
-      this.logAggregationDiagnosticsForNMs.clear();
-      this.logAggregationFailureMessagesForNMs.clear();
-    } else if (this.logAggregationSucceed + this.logAggregationFailed
-        == this.logAggregationStatus.size()) {
-      this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
-      // We have collected the log aggregation status for all NMs.
-      // The log aggregation status is FAILED which means the log
-      // aggregation fails in some NMs. We are only interested in the
-      // nodes where the log aggregation is failed. So we could remove
-      // the log aggregation details for those succeeded NMs
-      for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
-          this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
-        Map.Entry<NodeId, LogAggregationReport> entry = it.next();
-        if (entry.getValue().getLogAggregationStatus()
-          .equals(LogAggregationStatus.SUCCEEDED)) {
-          it.remove();
-        }
-      }
-      // the log aggregation has finished/failed.
-      // and the status will not be updated anymore.
-      this.logAggregationDiagnosticsForNMs.clear();
-    }
+  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+    return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
   }
 
-  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
-    try {
-      this.readLock.lock();
-      List<String> failureMessages =
-          this.logAggregationFailureMessagesForNMs.get(nodeId);
-      if (failureMessages == null || failureMessages.isEmpty()) {
-        return StringUtils.EMPTY;
-      }
-      return StringUtils.join(failureMessages, "\n");
-    } finally {
-      this.readLock.unlock();
-    }
+  @Override
+  public LogAggregationStatus getLogAggregationStatusForAppReport() {
+    return logAggregation
+        .getLogAggregationStatusForAppReport(this);
   }
 
   @Override
@@ -2138,4 +1868,13 @@ public class RMAppImpl implements RMApp, Recoverable {
               RMAppState state){
       /* TODO fail the application on the failed transition */
   }
+
+  @VisibleForTesting
+  long getLogAggregationStartTime() {
+    return logAggregation.getLogAggregationStartTime();
+  }
+
+  Clock getSystemClock() {
+    return systemClock;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java
new file mode 100644
index 0000000..b4409ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+/**
+ * Log aggregation logic used by RMApp.
+ *
+ */
+public class RMAppLogAggregation {
+  private final boolean logAggregationEnabled;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private long logAggregationStartTime = 0;
+  private final long logAggregationStatusTimeout;
+  private final Map<NodeId, LogAggregationReport> logAggregationStatus =
+      new ConcurrentHashMap<>();
+  private volatile LogAggregationStatus logAggregationStatusForAppReport;
+  private int logAggregationSucceed = 0;
+  private int logAggregationFailed = 0;
+  private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
+      new HashMap<>();
+  private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
+      new HashMap<>();
+  private final int maxLogAggregationDiagnosticsInMemory;
+
+  RMAppLogAggregation(Configuration conf, ReadLock readLock,
+      WriteLock writeLock) {
+    this.readLock = readLock;
+    this.writeLock = writeLock;
+    this.logAggregationStatusTimeout = getLogAggregationStatusTimeout(conf);
+    this.logAggregationEnabled = getEnabledFlagFromConf(conf);
+    this.logAggregationStatusForAppReport =
+        this.logAggregationEnabled ? LogAggregationStatus.NOT_START :
+            LogAggregationStatus.DISABLED;
+    this.maxLogAggregationDiagnosticsInMemory =
+        getMaxLogAggregationDiagnostics(conf);
+  }
+
+  private long getLogAggregationStatusTimeout(Configuration conf) {
+    long statusTimeout =
+        conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+    if (statusTimeout <= 0) {
+      return YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+    } else {
+      return statusTimeout;
+    }
+  }
+
+  private boolean getEnabledFlagFromConf(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+  }
+
+  private int getMaxLogAggregationDiagnostics(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+        YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+  }
+
+  Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp(
+      RMAppImpl rmApp) {
+    this.readLock.lock();
+    try {
+      if (!isLogAggregationFinished() && RMAppImpl.isAppInFinalState(rmApp) &&
+          rmApp.getSystemClock().getTime() > this.logAggregationStartTime
+              + this.logAggregationStatusTimeout) {
+        for (Map.Entry<NodeId, LogAggregationReport> output :
+            logAggregationStatus.entrySet()) {
+          if (!output.getValue().getLogAggregationStatus()
+              .equals(LogAggregationStatus.TIME_OUT)
+              && !output.getValue().getLogAggregationStatus()
+              .equals(LogAggregationStatus.SUCCEEDED)
+              && !output.getValue().getLogAggregationStatus()
+              .equals(LogAggregationStatus.FAILED)) {
+            output.getValue().setLogAggregationStatus(
+                LogAggregationStatus.TIME_OUT);
+          }
+        }
+      }
+      return Collections.unmodifiableMap(logAggregationStatus);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  void aggregateLogReport(NodeId nodeId, LogAggregationReport report,
+      RMAppImpl rmApp) {
+    this.writeLock.lock();
+    try {
+      if (this.logAggregationEnabled && !isLogAggregationFinished()) {
+        LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+        boolean stateChangedToFinal = false;
+        if (curReport == null) {
+          this.logAggregationStatus.put(nodeId, report);
+          if (isLogAggregationFinishedForNM(report)) {
+            stateChangedToFinal = true;
+          }
+        } else {
+          if (isLogAggregationFinishedForNM(report)) {
+            if (!isLogAggregationFinishedForNM(curReport)) {
+              stateChangedToFinal = true;
+            }
+          }
+          if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
+              || curReport.getLogAggregationStatus() !=
+              LogAggregationStatus.RUNNING_WITH_FAILURE) {
+            if (curReport.getLogAggregationStatus()
+                == LogAggregationStatus.TIME_OUT
+                && report.getLogAggregationStatus()
+                == LogAggregationStatus.RUNNING) {
+              // If the log aggregation status got from latest NM heartbeat
+              // is RUNNING, and current log aggregation status is TIME_OUT,
+              // based on whether there are any failure messages for this NM,
+              // we will reset the log aggregation status as RUNNING or
+              // RUNNING_WITH_FAILURE
+              if (isThereFailureMessageForNM(nodeId)) {
+                report.setLogAggregationStatus(
+                    LogAggregationStatus.RUNNING_WITH_FAILURE);
+              }
+            }
+            curReport.setLogAggregationStatus(report
+                .getLogAggregationStatus());
+          }
+        }
+        updateLogAggregationDiagnosticMessages(nodeId, report);
+        if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) {
+          updateLogAggregationStatus(nodeId);
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public LogAggregationStatus getLogAggregationStatusForAppReport(
+      RMAppImpl rmApp) {
+    boolean appInFinalState = RMAppImpl.isAppInFinalState(rmApp);
+    this.readLock.lock();
+    try {
+      if (!logAggregationEnabled) {
+        return LogAggregationStatus.DISABLED;
+      }
+      if (isLogAggregationFinished()) {
+        return this.logAggregationStatusForAppReport;
+      }
+      Map<NodeId, LogAggregationReport> reports =
+          getLogAggregationReportsForApp(rmApp);
+      if (reports.size() == 0) {
+        return this.logAggregationStatusForAppReport;
+      }
+      int logNotStartCount = 0;
+      int logCompletedCount = 0;
+      int logTimeOutCount = 0;
+      int logFailedCount = 0;
+      int logRunningWithFailure = 0;
+      for (Map.Entry<NodeId, LogAggregationReport> report :
+          reports.entrySet()) {
+        switch (report.getValue().getLogAggregationStatus()) {
+          case NOT_START:
+            logNotStartCount++;
+            break;
+          case RUNNING_WITH_FAILURE:
+            logRunningWithFailure ++;
+            break;
+          case SUCCEEDED:
+            logCompletedCount++;
+            break;
+          case FAILED:
+            logFailedCount++;
+            logCompletedCount++;
+            break;
+          case TIME_OUT:
+            logTimeOutCount++;
+            logCompletedCount++;
+            break;
+          default:
+            break;
+        }
+      }
+      if (logNotStartCount == reports.size()) {
+        return LogAggregationStatus.NOT_START;
+      } else if (logCompletedCount == reports.size()) {
+        // We should satisfy two condition in order to return
+        // SUCCEEDED or FAILED.
+        // 1) make sure the application is in final state
+        // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
+        // The SUCCEEDED/FAILED status is the final status which means
+        // the log aggregation is finished. And the log aggregation status will
+        // not be updated anymore.
+        if (logFailedCount > 0 && appInFinalState) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.FAILED;
+          return LogAggregationStatus.FAILED;
+        } else if (logTimeOutCount > 0) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.TIME_OUT;
+          return LogAggregationStatus.TIME_OUT;
+        }
+        if (appInFinalState) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.SUCCEEDED;
+          return LogAggregationStatus.SUCCEEDED;
+        }
+      } else if (logRunningWithFailure > 0) {
+        return LogAggregationStatus.RUNNING_WITH_FAILURE;
+      }
+      return LogAggregationStatus.RUNNING;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  private boolean isLogAggregationFinished() {
+    return this.logAggregationStatusForAppReport
+        .equals(LogAggregationStatus.SUCCEEDED)
+        || this.logAggregationStatusForAppReport
+        .equals(LogAggregationStatus.FAILED)
+        || this.logAggregationStatusForAppReport
+        .equals(LogAggregationStatus.TIME_OUT);
+
+  }
+
+  private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
+    return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
+        || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
+  }
+
+  private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
+      LogAggregationReport report) {
+    if (report.getDiagnosticMessage() != null
+        && !report.getDiagnosticMessage().isEmpty()) {
+      if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING ) {
+        List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
+        if (diagnostics == null) {
+          diagnostics = new ArrayList<>();
+          logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
+        } else {
+          if (diagnostics.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            diagnostics.remove(0);
+          }
+        }
+        diagnostics.add(report.getDiagnosticMessage());
+        this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
+            StringUtils.join(diagnostics, "\n"));
+      } else if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+        List<String> failureMessages =
+            logAggregationFailureMessagesForNMs.get(nodeId);
+        if (failureMessages == null) {
+          failureMessages = new ArrayList<>();
+          logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
+        } else {
+          if (failureMessages.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            failureMessages.remove(0);
+          }
+        }
+        failureMessages.add(report.getDiagnosticMessage());
+      }
+    }
+  }
+
+  private void updateLogAggregationStatus(NodeId nodeId) {
+    LogAggregationStatus status =
+        this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+    if (status.equals(LogAggregationStatus.SUCCEEDED)) {
+      this.logAggregationSucceed++;
+    } else if (status.equals(LogAggregationStatus.FAILED)) {
+      this.logAggregationFailed++;
+    }
+    if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport =
+          LogAggregationStatus.SUCCEEDED;
+      // Since the log aggregation status for this application for all NMs
+      // is SUCCEEDED, it means all logs are aggregated successfully.
+      // We could remove all the cached log aggregation reports
+      this.logAggregationStatus.clear();
+      this.logAggregationDiagnosticsForNMs.clear();
+      this.logAggregationFailureMessagesForNMs.clear();
+    } else if (this.logAggregationSucceed + this.logAggregationFailed
+        == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
+      // We have collected the log aggregation status for all NMs.
+      // The log aggregation status is FAILED which means the log
+      // aggregation fails in some NMs. We are only interested in the
+      // nodes where the log aggregation is failed. So we could remove
+      // the log aggregation details for those succeeded NMs
+      this.logAggregationStatus.entrySet().removeIf(entry ->
+          entry.getValue().getLogAggregationStatus()
+          .equals(LogAggregationStatus.SUCCEEDED));
+      // the log aggregation has finished/failed.
+      // and the status will not be updated anymore.
+      this.logAggregationDiagnosticsForNMs.clear();
+    }
+  }
+
+  String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+    this.readLock.lock();
+    try {
+      List<String> failureMessages =
+          this.logAggregationFailureMessagesForNMs.get(nodeId);
+      if (failureMessages == null || failureMessages.isEmpty()) {
+        return StringUtils.EMPTY;
+      }
+      return StringUtils.join(failureMessages, "\n");
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  void recordLogAggregationStartTime(long time) {
+    logAggregationStartTime = time;
+  }
+
+  public boolean isEnabled() {
+    return logAggregationEnabled;
+  }
+
+  private boolean hasReportForNodeManager(NodeId nodeId) {
+    return logAggregationStatus.containsKey(nodeId);
+  }
+
+  private void addReportForNodeManager(NodeId nodeId,
+      LogAggregationReport report) {
+    logAggregationStatus.put(nodeId, report);
+  }
+
+  public boolean isFinished() {
+    return isLogAggregationFinished();
+  }
+
+  private boolean isThereFailureMessageForNM(NodeId nodeId) {
+    return logAggregationFailureMessagesForNMs.get(nodeId) != null
+        && !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty();
+  }
+
+  long getLogAggregationStartTime() {
+    return logAggregationStartTime;
+  }
+
+  void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) {
+    if (!hasReportForNodeManager(nodeId)) {
+      LogAggregationStatus status = isEnabled() ? LogAggregationStatus.NOT_START
+          : LogAggregationStatus.DISABLED;
+      addReportForNodeManager(nodeId,
+          LogAggregationReport.newInstance(applicationId, status, ""));
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org