You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/04/13 19:49:59 UTC

[hadoop] branch branch-2.10 updated: MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 48a8d76  MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)
48a8d76 is described below

commit 48a8d7691ead49911a8ff56ba678a093342cabdd
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Mon Apr 13 19:47:27 2020 +0000

    MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)
---
 .../hadoop/mapred/TaskAttemptListenerImpl.java     | 94 ++++++++++++++++++++--
 .../mapred/TestTaskAttemptFinishingMonitor.java    |  2 +-
 .../hadoop/mapred/TestTaskAttemptListenerImpl.java |  2 +
 .../apache/hadoop/mapreduce/v2/app/TestFail.java   |  1 +
 .../mapreduce/v2/app/TestTaskHeartbeatHandler.java |  2 +
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   | 23 ++++++
 .../hadoop/mapreduce/util/MRJobConfUtil.java       | 72 +++++++++++++++++
 7 files changed, 189 insertions(+), 7 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 8151286..6d0e781 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -56,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -89,6 +91,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap<TaskAttemptId,
       AtomicReference<TaskAttemptStatus>> attemptIdToStatus
         = new ConcurrentHashMap<>();
+  /**
+   * A Map to keep track of the history of logging each task attempt.
+   */
+  private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
+      taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
 
   private Set<WrappedJvmID> launchedJVMs = Collections
       .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@@ -110,10 +117,12 @@ public class TaskAttemptListenerImpl extends CompositeService
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-   registerHeartbeatHandler(conf);
-   commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
-       MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
-   super.serviceInit(conf);
+    registerHeartbeatHandler(conf);
+    commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+        MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
+    // initialize the delta threshold for logging the task progress.
+    MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
+    super.serviceInit(conf);
   }
 
   @Override
@@ -164,6 +173,9 @@ public class TaskAttemptListenerImpl extends CompositeService
   @Override
   protected void serviceStop() throws Exception {
     stopRpcServer();
+    if (taskAttemptLogProgressStamps != null) {
+      taskAttemptLogProgressStamps.clear();
+    }
     super.serviceStop();
   }
 
@@ -359,8 +371,15 @@ public class TaskAttemptListenerImpl extends CompositeService
     taskAttemptStatus.id = yarnAttemptID;
     // Task sends the updated progress to the TT.
     taskAttemptStatus.progress = taskStatus.getProgress();
-    LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
-        + taskStatus.getProgress());
+    // log the new progress
+    TaskProgressLogPair logPair =
+        taskAttemptLogProgressStamps.get(taskAttemptID);
+    if (logPair == null) {
+      taskAttemptLogProgressStamps.putIfAbsent(taskAttemptID,
+          new TaskProgressLogPair(taskAttemptID));
+      logPair = taskAttemptLogProgressStamps.get(taskAttemptID);
+    }
+    logPair.update(taskStatus.getProgress());
     // Task sends the updated state-string to the TT.
     taskAttemptStatus.stateString = taskStatus.getStateString();
     // Task sends the updated phase to the TT.
@@ -574,4 +593,67 @@ public class TaskAttemptListenerImpl extends CompositeService
       AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
     return attemptIdToStatus;
   }
+
+  /**
+   * Entity to keep track of the taskAttempt, last time it was logged,
+   * and the progress that has been logged.
+   */
+  class TaskProgressLogPair {
+
+    /**
+     * The taskAttemptId of that history record.
+     */
+    private final TaskAttemptID taskAttemptID;
+    /**
+     * Timestamp of last time the progress was logged.
+     */
+    private volatile long logTimeStamp;
+    /**
+     * Snapshot of the last logged progress.
+     */
+    private volatile double prevProgress;
+
+    TaskProgressLogPair(final TaskAttemptID attemptID) {
+      taskAttemptID = attemptID;
+      prevProgress = 0.0;
+      logTimeStamp = 0;
+    }
+
+    private void resetLog(final boolean doLog,
+        final float progress, final double processedProgress,
+        final long timestamp) {
+      if (doLog) {
+        prevProgress = processedProgress;
+        logTimeStamp = timestamp;
+        LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+            + progress);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
+              + progress);
+        }
+      }
+    }
+
+    public void update(final float progress) {
+      final double processedProgress =
+          MRJobConfUtil.convertTaskProgressToFactor(progress);
+      final double diffProgress = processedProgress - prevProgress;
+      final long currentTime = Time.monotonicNow();
+      boolean result =
+          (Double.compare(diffProgress,
+              MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
+      if (!result) {
+        // check if time has expired.
+        result = ((currentTime - logTimeStamp)
+            >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
+      }
+      // It is helpful to log the progress when it reaches 1.0F.
+      if (Float.compare(progress, 1.0f) == 0) {
+        result = true;
+        taskAttemptLogProgressStamps.remove(taskAttemptID);
+      }
+      resetLog(result, progress, processedProgress, currentTime);
+    }
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
index 32e6867..46b40bd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
@@ -50,7 +50,7 @@ public class TestTaskAttemptFinishingMonitor {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
     conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
-
+    conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
     AppContext appCtx = mock(AppContext.class);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
     RMHeartbeatHandler rmHeartbeatHandler =
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index 53a2ba0..023ca4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -423,6 +423,8 @@ public class TestTaskAttemptListenerImpl {
 
     Configuration conf = new Configuration();
     conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+    conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
+    conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1);
     tal.init(conf);
     tal.start();
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 75c78fd..81cad82 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -268,6 +268,7 @@ public class TestFail {
         protected void serviceInit(Configuration conf) throws Exception {
           conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
           conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
+          conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
           super.serviceInit(conf);
         }
       };
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
index 5d86479..ca03958 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
@@ -58,6 +58,7 @@ public class TestTaskHeartbeatHandler {
     // so that TASK_TIMEOUT is not overridden
     conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
     conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
+    conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
     
     hb.init(conf);
     hb.start();
@@ -117,6 +118,7 @@ public class TestTaskHeartbeatHandler {
         new TaskHeartbeatHandler(mockHandler, clock, 1);
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+    conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
     hb.init(conf);
     hb.start();
     try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 37be797..d22b9ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -346,6 +346,29 @@ public interface MRJobConfig {
 
   public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
 
+  /**
+   * TaskAttemptListenerImpl will log the task progress when the delta progress
+   * is larger than or equal the defined value.
+   * The double value has to be between 0, and 1 with two decimals.
+   */
+  String TASK_LOG_PROGRESS_DELTA_THRESHOLD =
+      "mapreduce.task.log.progress.delta.threshold";
+  /**
+   * Default delta progress is set to 5%.
+   */
+  double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05;
+  /**
+   * TaskAttemptListenerImpl will log the task progress when the defined value
+   * in seconds expires.
+   * This helps to debug task attempts that are doing very slow progress.
+   */
+  String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS =
+      "mapreduce.task.log.progress.wait.interval-seconds";
+  /**
+   * Default period to log the task attempt progress is 60 seconds.
+   */
+  long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L;
+
   public static final String TASK_ID = "mapreduce.task.id";
 
   public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
index afedef3..4e4e78e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapreduce.util;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
@@ -58,4 +59,75 @@ public final class MRJobConfUtil {
   }
 
   public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f;
+
+  /**
+   * Configurations to control the frequency of logging of task Attempt.
+   */
+  public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0;
+  private static volatile Double progressMinDeltaThreshold = null;
+  private static volatile Long progressMaxWaitDeltaTimeThreshold = null;
+
+  /**
+   * load the values defined from a configuration file including the delta
+   * progress and the maximum time between each log message.
+   * @param conf
+   */
+  public static void setTaskLogProgressDeltaThresholds(
+      final Configuration conf) {
+    if (progressMinDeltaThreshold == null) {
+      progressMinDeltaThreshold =
+          new Double(PROGRESS_MIN_DELTA_FACTOR
+              * conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD,
+              MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT));
+    }
+
+    if (progressMaxWaitDeltaTimeThreshold == null) {
+      progressMaxWaitDeltaTimeThreshold =
+          TimeUnit.SECONDS.toMillis(conf
+              .getLong(
+                  MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS,
+                  MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT));
+    }
+  }
+
+  /**
+   * Retrieves the min delta progress required to log the task attempt current
+   * progress.
+   * @return the defined threshold in the conf.
+   *         returns the default value if
+   *         {@link #setTaskLogProgressDeltaThresholds} has not been called.
+   */
+  public static double getTaskProgressMinDeltaThreshold() {
+    if (progressMinDeltaThreshold == null) {
+      return PROGRESS_MIN_DELTA_FACTOR
+          * MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT;
+    }
+    return progressMinDeltaThreshold.doubleValue();
+  }
+
+  /**
+   * Retrieves the min time required to log the task attempt current
+   * progress.
+   * @return the defined threshold in the conf.
+   *         returns the default value if
+   *         {@link #setTaskLogProgressDeltaThresholds} has not been called.
+   */
+  public static long getTaskProgressWaitDeltaTimeThreshold() {
+    if (progressMaxWaitDeltaTimeThreshold == null) {
+      return TimeUnit.SECONDS.toMillis(
+          MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT);
+    }
+    return progressMaxWaitDeltaTimeThreshold.longValue();
+  }
+
+  /**
+   * Coverts a progress between 0.0 to 1.0 to double format used to log the
+   * task attempt.
+   * @param progress of the task which is a value between 0.0 and 1.0.
+   * @return the double value that is less than or equal to the argument
+   *          multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}.
+   */
+  public static double convertTaskProgressToFactor(final float progress) {
+    return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org