You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/04/13 20:34:43 UTC
[hadoop] branch branch-2.9 updated: MAPREDUCE-7272.
TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed
Hussein (ahussein)
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new a23a1c0 MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)
a23a1c0 is described below
commit a23a1c07c4ffa814359c20ce3359249fccbd7ab9
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Mon Apr 13 19:47:27 2020 +0000
MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)
(cherry picked from commit 48a8d7691ead49911a8ff56ba678a093342cabdd)
---
.../hadoop/mapred/TaskAttemptListenerImpl.java | 94 ++++++++++++++++++++--
.../mapred/TestTaskAttemptFinishingMonitor.java | 2 +-
.../hadoop/mapred/TestTaskAttemptListenerImpl.java | 2 +
.../apache/hadoop/mapreduce/v2/app/TestFail.java | 1 +
.../mapreduce/v2/app/TestTaskHeartbeatHandler.java | 2 +
.../org/apache/hadoop/mapreduce/MRJobConfig.java | 23 ++++++
.../hadoop/mapreduce/util/MRJobConfUtil.java | 72 +++++++++++++++++
7 files changed, 189 insertions(+), 7 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 8151286..6d0e781 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -56,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@@ -89,6 +91,11 @@ public class TaskAttemptListenerImpl extends CompositeService
private ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
= new ConcurrentHashMap<>();
+ /**
+ * A Map to keep track of the history of logging each task attempt.
+ */
+ private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
+ taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@@ -110,10 +117,12 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override
protected void serviceInit(Configuration conf) throws Exception {
- registerHeartbeatHandler(conf);
- commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
- MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
- super.serviceInit(conf);
+ registerHeartbeatHandler(conf);
+ commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+ MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
+ // initialize the delta threshold for logging the task progress.
+ MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
+ super.serviceInit(conf);
}
@Override
@@ -164,6 +173,9 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override
protected void serviceStop() throws Exception {
stopRpcServer();
+ if (taskAttemptLogProgressStamps != null) {
+ taskAttemptLogProgressStamps.clear();
+ }
super.serviceStop();
}
@@ -359,8 +371,15 @@ public class TaskAttemptListenerImpl extends CompositeService
taskAttemptStatus.id = yarnAttemptID;
// Task sends the updated progress to the TT.
taskAttemptStatus.progress = taskStatus.getProgress();
- LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
- + taskStatus.getProgress());
+ // log the new progress
+ TaskProgressLogPair logPair =
+ taskAttemptLogProgressStamps.get(taskAttemptID);
+ if (logPair == null) {
+ taskAttemptLogProgressStamps.putIfAbsent(taskAttemptID,
+ new TaskProgressLogPair(taskAttemptID));
+ logPair = taskAttemptLogProgressStamps.get(taskAttemptID);
+ }
+ logPair.update(taskStatus.getProgress());
// Task sends the updated state-string to the TT.
taskAttemptStatus.stateString = taskStatus.getStateString();
// Task sends the updated phase to the TT.
@@ -574,4 +593,67 @@ public class TaskAttemptListenerImpl extends CompositeService
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
return attemptIdToStatus;
}
+
+ /**
+ * Entity to keep track of the taskAttempt, last time it was logged,
+ * and the progress that has been logged.
+ */
+ class TaskProgressLogPair {
+
+ /**
+ * The taskAttemptId of that history record.
+ */
+ private final TaskAttemptID taskAttemptID;
+ /**
+ * Timestamp of last time the progress was logged.
+ */
+ private volatile long logTimeStamp;
+ /**
+ * Snapshot of the last logged progress.
+ */
+ private volatile double prevProgress;
+
+ TaskProgressLogPair(final TaskAttemptID attemptID) {
+ taskAttemptID = attemptID;
+ prevProgress = 0.0;
+ logTimeStamp = 0;
+ }
+
+ private void resetLog(final boolean doLog,
+ final float progress, final double processedProgress,
+ final long timestamp) {
+ if (doLog) {
+ prevProgress = processedProgress;
+ logTimeStamp = timestamp;
+ LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+ + progress);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
+ + progress);
+ }
+ }
+ }
+
+ public void update(final float progress) {
+ final double processedProgress =
+ MRJobConfUtil.convertTaskProgressToFactor(progress);
+ final double diffProgress = processedProgress - prevProgress;
+ final long currentTime = Time.monotonicNow();
+ boolean result =
+ (Double.compare(diffProgress,
+ MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
+ if (!result) {
+ // check if time has expired.
+ result = ((currentTime - logTimeStamp)
+ >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
+ }
+ // It is helpful to log the progress when it reaches 1.0F.
+ if (Float.compare(progress, 1.0f) == 0) {
+ result = true;
+ taskAttemptLogProgressStamps.remove(taskAttemptID);
+ }
+ resetLog(result, progress, processedProgress, currentTime);
+ }
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
index 32e6867..46b40bd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
@@ -50,7 +50,7 @@ public class TestTaskAttemptFinishingMonitor {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
-
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index 53a2ba0..023ca4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -423,6 +423,8 @@ public class TestTaskAttemptListenerImpl {
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1);
tal.init(conf);
tal.start();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 75c78fd..81cad82 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -268,6 +268,7 @@ public class TestFail {
protected void serviceInit(Configuration conf) throws Exception {
conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
super.serviceInit(conf);
}
};
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
index 5d86479..ca03958 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
@@ -58,6 +58,7 @@ public class TestTaskHeartbeatHandler {
// so that TASK_TIMEOUT is not overridden
conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
hb.init(conf);
hb.start();
@@ -117,6 +118,7 @@ public class TestTaskHeartbeatHandler {
new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+ conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
hb.init(conf);
hb.start();
try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d666123..15c8f32 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -346,6 +346,29 @@ public interface MRJobConfig {
public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
+ /**
+ * TaskAttemptListenerImpl will log the task progress when the delta progress
+ * is larger than or equal the defined value.
+ * The double value has to be between 0, and 1 with two decimals.
+ */
+ String TASK_LOG_PROGRESS_DELTA_THRESHOLD =
+ "mapreduce.task.log.progress.delta.threshold";
+ /**
+ * Default delta progress is set to 5%.
+ */
+ double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05;
+ /**
+ * TaskAttemptListenerImpl will log the task progress when the defined value
+ * in seconds expires.
+ * This helps to debug task attempts that are doing very slow progress.
+ */
+ String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS =
+ "mapreduce.task.log.progress.wait.interval-seconds";
+ /**
+ * Default period to log the task attempt progress is 60 seconds.
+ */
+ long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L;
+
public static final String TASK_ID = "mapreduce.task.id";
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
index afedef3..4e4e78e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapreduce.util;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -58,4 +59,75 @@ public final class MRJobConfUtil {
}
public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f;
+
+ /**
+ * Configurations to control the frequency of logging of task Attempt.
+ */
+ public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0;
+ private static volatile Double progressMinDeltaThreshold = null;
+ private static volatile Long progressMaxWaitDeltaTimeThreshold = null;
+
+ /**
+ * load the values defined from a configuration file including the delta
+ * progress and the maximum time between each log message.
+ * @param conf
+ */
+ public static void setTaskLogProgressDeltaThresholds(
+ final Configuration conf) {
+ if (progressMinDeltaThreshold == null) {
+ progressMinDeltaThreshold =
+ new Double(PROGRESS_MIN_DELTA_FACTOR
+ * conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD,
+ MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT));
+ }
+
+ if (progressMaxWaitDeltaTimeThreshold == null) {
+ progressMaxWaitDeltaTimeThreshold =
+ TimeUnit.SECONDS.toMillis(conf
+ .getLong(
+ MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS,
+ MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT));
+ }
+ }
+
+ /**
+ * Retrieves the min delta progress required to log the task attempt current
+ * progress.
+ * @return the defined threshold in the conf.
+ * returns the default value if
+ * {@link #setTaskLogProgressDeltaThresholds} has not been called.
+ */
+ public static double getTaskProgressMinDeltaThreshold() {
+ if (progressMinDeltaThreshold == null) {
+ return PROGRESS_MIN_DELTA_FACTOR
+ * MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT;
+ }
+ return progressMinDeltaThreshold.doubleValue();
+ }
+
+ /**
+ * Retrieves the min time required to log the task attempt current
+ * progress.
+ * @return the defined threshold in the conf.
+ * returns the default value if
+ * {@link #setTaskLogProgressDeltaThresholds} has not been called.
+ */
+ public static long getTaskProgressWaitDeltaTimeThreshold() {
+ if (progressMaxWaitDeltaTimeThreshold == null) {
+ return TimeUnit.SECONDS.toMillis(
+ MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT);
+ }
+ return progressMaxWaitDeltaTimeThreshold.longValue();
+ }
+
+ /**
+ * Coverts a progress between 0.0 to 1.0 to double format used to log the
+ * task attempt.
+ * @param progress of the task which is a value between 0.0 and 1.0.
+ * @return the double value that is less than or equal to the argument
+ * multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}.
+ */
+ public static double convertTaskProgressToFactor(final float progress) {
+ return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org