You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/04 20:25:22 UTC
[13/23] hadoop git commit: MAPREDUCE-7028. Concurrent task progress
updates causing NPE in Application Master. Contributed by Gergo Repas
MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe351035
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe351035
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe351035
Branch: refs/heads/HDFS-7240
Commit: fe35103591ece0209f8345aba5544313e45a073c
Parents: c9bf813
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 3 11:01:38 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 3 11:01:38 2018 -0600
----------------------------------------------------------------------
.../hadoop/mapred/TaskAttemptListenerImpl.java | 41 +++++++++++---------
1 file changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe351035/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 67f8ff0..556c90c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -585,33 +585,38 @@ public class TaskAttemptListenerImpl extends CompositeService
private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
TaskAttemptStatus taskAttemptStatus,
AtomicReference<TaskAttemptStatus> lastStatusRef) {
- boolean asyncUpdatedNeeded = false;
- TaskAttemptStatus lastStatus = lastStatusRef.get();
-
- if (lastStatus == null) {
- lastStatusRef.set(taskAttemptStatus);
- asyncUpdatedNeeded = true;
- } else {
- List<TaskAttemptId> oldFetchFailedMaps =
- taskAttemptStatus.fetchFailedMaps;
-
- // merge fetchFailedMaps from the previous update
- if (lastStatus.fetchFailedMaps != null) {
+ List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
+ TaskAttemptStatus lastStatus = null;
+ boolean done = false;
+ while (!done) {
+ lastStatus = lastStatusRef.get();
+ if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
+ // merge fetchFailedMaps from the previous update
if (taskAttemptStatus.fetchFailedMaps == null) {
taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
} else {
- taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+ taskAttemptStatus.fetchFailedMaps =
+ new ArrayList<>(lastStatus.fetchFailedMaps.size() +
+ fetchFailedMaps.size());
+ taskAttemptStatus.fetchFailedMaps.addAll(
+ lastStatus.fetchFailedMaps);
+ taskAttemptStatus.fetchFailedMaps.addAll(
+ fetchFailedMaps);
}
}
- if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
- // update failed - async dispatcher has processed it in the meantime
- taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
- lastStatusRef.set(taskAttemptStatus);
- asyncUpdatedNeeded = true;
+ // lastStatusRef may be changed by either the AsyncDispatcher when
+ // it processes the update, or by another IPC server handler
+ done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
+ if (!done) {
+ LOG.info("TaskAttempt " + yarnAttemptID +
+ ": lastStatusRef changed by another thread, retrying...");
+ // let's revert taskAttemptStatus.fetchFailedMaps
+ taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
}
}
+ boolean asyncUpdatedNeeded = (lastStatus == null);
if (asyncUpdatedNeeded) {
context.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org