You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/04 20:25:22 UTC

[13/23] hadoop git commit: MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas

MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe351035
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe351035
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe351035

Branch: refs/heads/HDFS-7240
Commit: fe35103591ece0209f8345aba5544313e45a073c
Parents: c9bf813
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 3 11:01:38 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 3 11:01:38 2018 -0600

----------------------------------------------------------------------
 .../hadoop/mapred/TaskAttemptListenerImpl.java  | 41 +++++++++++---------
 1 file changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe351035/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 67f8ff0..556c90c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -585,33 +585,38 @@ public class TaskAttemptListenerImpl extends CompositeService
   private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
       TaskAttemptStatus taskAttemptStatus,
       AtomicReference<TaskAttemptStatus> lastStatusRef) {
-    boolean asyncUpdatedNeeded = false;
-    TaskAttemptStatus lastStatus = lastStatusRef.get();
-
-    if (lastStatus == null) {
-      lastStatusRef.set(taskAttemptStatus);
-      asyncUpdatedNeeded = true;
-    } else {
-      List<TaskAttemptId> oldFetchFailedMaps =
-          taskAttemptStatus.fetchFailedMaps;
-
-      // merge fetchFailedMaps from the previous update
-      if (lastStatus.fetchFailedMaps != null) {
+    List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
+    TaskAttemptStatus lastStatus = null;
+    boolean done = false;
+    while (!done) {
+      lastStatus = lastStatusRef.get();
+      if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
+        // merge fetchFailedMaps from the previous update
         if (taskAttemptStatus.fetchFailedMaps == null) {
           taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
         } else {
-          taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps =
+              new ArrayList<>(lastStatus.fetchFailedMaps.size() +
+                  fetchFailedMaps.size());
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              fetchFailedMaps);
         }
       }
 
-      if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
-        // update failed - async dispatcher has processed it in the meantime
-        taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
-        lastStatusRef.set(taskAttemptStatus);
-        asyncUpdatedNeeded = true;
+      // lastStatusRef may be changed by either the AsyncDispatcher when
+      // it processes the update, or by another IPC server handler
+      done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
+      if (!done) {
+        LOG.info("TaskAttempt " + yarnAttemptID +
+            ": lastStatusRef changed by another thread, retrying...");
+        // let's revert taskAttemptStatus.fetchFailedMaps
+        taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
       }
     }
 
+    boolean asyncUpdatedNeeded = (lastStatus == null);
     if (asyncUpdatedNeeded) {
       context.getEventHandler().handle(
           new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org