You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2017/12/01 20:09:16 UTC

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0faf50624 -> 21d362735


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273

Branch: refs/heads/trunk
Commit: 21d36273551fa45c4130e5523b6724358cf34b1e
Parents: 0faf506
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 1 14:04:25 2017 -0600

----------------------------------------------------------------------
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  69 +++-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java        |  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java     | 315 ++++++++++++-------
 .../mapreduce/v2/app/TestFetchFailure.java      |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 302 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
-@SuppressWarnings({"unchecked"})
 public class TaskAttemptListenerImpl extends CompositeService 
     implements TaskUmbilicalProtocol, TaskAttemptListener {
 
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
     jvmIDToActiveAttemptMap
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
+
+  private ConcurrentMap<TaskAttemptId,
+      AtomicReference<TaskAttemptStatus>> attemptIdToStatus
+        = new ConcurrentHashMap<>();
+
   private Set<WrappedJvmID> launchedJVMs = Collections
       .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
 
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
 
+    AtomicReference<TaskAttemptStatus> lastStatusRef =
+        attemptIdToStatus.get(yarnAttemptID);
+    if (lastStatusRef == null) {
+      throw new IllegalStateException("Status update was called"
+          + " with illegal TaskAttemptId: " + yarnAttemptID);
+    }
+
     AMFeedback feedback = new AMFeedback();
     feedback.setTaskFound(true);
 
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService
 //    // isn't ever changed by the Task itself.
 //    taskStatus.getIncludeCounters();
 
-    context.getEventHandler().handle(
-        new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
-            taskAttemptStatus));
+    coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef);
+
     return feedback;
   }
 
@@ -520,6 +535,8 @@ public class TaskAttemptListenerImpl extends CompositeService
     launchedJVMs.add(jvmId);
 
     taskHeartbeatHandler.register(attemptID);
+
+    attemptIdToStatus.put(attemptID, new AtomicReference<>());
   }
 
   @Override
@@ -541,6 +558,8 @@ public class TaskAttemptListenerImpl extends CompositeService
 
     //unregister this attempt
     taskHeartbeatHandler.unregister(attemptID);
+
+    attemptIdToStatus.remove(attemptID);
   }
 
   @Override
@@ -563,4 +582,46 @@ public class TaskAttemptListenerImpl extends CompositeService
     preemptionPolicy.setCheckpointID(tid, cid);
   }
 
+  private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
+      TaskAttemptStatus taskAttemptStatus,
+      AtomicReference<TaskAttemptStatus> lastStatusRef) {
+    boolean asyncUpdatedNeeded = false;
+    TaskAttemptStatus lastStatus = lastStatusRef.get();
+
+    if (lastStatus == null) {
+      lastStatusRef.set(taskAttemptStatus);
+      asyncUpdatedNeeded = true;
+    } else {
+      List<TaskAttemptId> oldFetchFailedMaps =
+          taskAttemptStatus.fetchFailedMaps;
+
+      // merge fetchFailedMaps from the previous update
+      if (lastStatus.fetchFailedMaps != null) {
+        if (taskAttemptStatus.fetchFailedMaps == null) {
+          taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
+        } else {
+          taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+        }
+      }
+
+      if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
+        // update failed - async dispatcher has processed it in the meantime
+        taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
+        lastStatusRef.set(taskAttemptStatus);
+        asyncUpdatedNeeded = true;
+      }
+    }
+
+    if (asyncUpdatedNeeded) {
+      context.getEventHandler().handle(
+          new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+              lastStatusRef));
+    }
+  }
+
+  @VisibleForTesting
+  ConcurrentMap<TaskAttemptId,
+      AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
+    return attemptIdToStatus;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
index 715f63d..cef4fd0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.event;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@@ -26,17 +27,16 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
-
-  private TaskAttemptStatus reportedTaskAttemptStatus;
+  private AtomicReference<TaskAttemptStatus> taskAttemptStatusRef;
 
   public TaskAttemptStatusUpdateEvent(TaskAttemptId id,
-      TaskAttemptStatus taskAttemptStatus) {
+      AtomicReference<TaskAttemptStatus> taskAttemptStatusRef) {
     super(id, TaskAttemptEventType.TA_UPDATE);
-    this.reportedTaskAttemptStatus = taskAttemptStatus;
+    this.taskAttemptStatusRef = taskAttemptStatusRef;
   }
 
-  public TaskAttemptStatus getReportedTaskAttemptStatus() {
-    return reportedTaskAttemptStatus;
+  public AtomicReference<TaskAttemptStatus> getTaskAttemptStatusRef() {
+    return taskAttemptStatusRef;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 90e0d21..431128b 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1780,7 +1781,6 @@ public abstract class TaskAttemptImpl implements
     taskAttempt.updateProgressSplits();
   }
 
-
   static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     private final boolean rescheduled;
@@ -1965,6 +1965,7 @@ public abstract class TaskAttemptImpl implements
       // register it to TaskAttemptListener so that it can start monitoring it.
       taskAttempt.taskAttemptListener
         .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
+
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
           NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
@@ -2430,15 +2431,20 @@ public abstract class TaskAttemptImpl implements
   }
 
   private static class StatusUpdater 
-       implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+      implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
-      // Status update calls don't really change the state of the attempt.
+      TaskAttemptStatusUpdateEvent statusEvent =
+          ((TaskAttemptStatusUpdateEvent)event);
+
+      AtomicReference<TaskAttemptStatus> taskAttemptStatusRef =
+          statusEvent.getTaskAttemptStatusRef();
+
       TaskAttemptStatus newReportedStatus =
-          ((TaskAttemptStatusUpdateEvent) event)
-              .getReportedTaskAttemptStatus();
+          taskAttemptStatusRef.getAndSet(null);
+
       // Now switch the information in the reportedStatus
       taskAttempt.reportedStatus = newReportedStatus;
       taskAttempt.reportedStatus.taskState = taskAttempt.getState();
@@ -2447,12 +2453,10 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.eventHandler.handle
           (new SpeculatorEvent
               (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-      
       taskAttempt.updateProgressSplits();
-      
       //if fetch failures are present, send the fetch failure event to job
       //this only will happen in reduce attempt type
-      if (taskAttempt.reportedStatus.fetchFailedMaps != null && 
+      if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
           taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
         String hostname = taskAttempt.container == null ? "UNKNOWN"
             : taskAttempt.container.getNodeId().getHost();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index fa8418a..4ff6fb2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -42,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -52,12 +57,69 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.SystemClock;
-
+import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+/**
+ * Tests the behavior of TaskAttemptListenerImpl.
+ */
+@RunWith(MockitoJUnitRunner.class)
 public class TestTaskAttemptListenerImpl {
+  private static final String ATTEMPT1_ID =
+      "attempt_123456789012_0001_m_000001_0";
+  private static final String ATTEMPT2_ID =
+      "attempt_123456789012_0001_m_000002_0";
+
+  private static final TaskAttemptId TASKATTEMPTID1 =
+      TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
+  private static final TaskAttemptId TASKATTEMPTID2 =
+      TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
+
+  @Mock
+  private AppContext appCtx;
+
+  @Mock
+  private JobTokenSecretManager secret;
+
+  @Mock
+  private RMHeartbeatHandler rmHeartbeatHandler;
+
+  @Mock
+  private TaskHeartbeatHandler hbHandler;
+
+  @Mock
+  private Dispatcher dispatcher;
+
+  @Mock
+  private Task task;
+
+  @SuppressWarnings("rawtypes")
+  @Mock
+  private EventHandler<Event> ea;
+
+  @SuppressWarnings("rawtypes")
+  @Captor
+  private ArgumentCaptor<Event> eventCaptor;
+
+  private CheckpointAMPreemptionPolicy policy;
+  private JVMId id;
+  private WrappedJvmID wid;
+  private TaskAttemptID attemptID;
+  private TaskAttemptId attemptId;
+  private ReduceTaskStatus firstReduceStatus;
+  private ReduceTaskStatus secondReduceStatus;
+  private ReduceTaskStatus thirdReduceStatus;
+
+  private MockTaskAttemptListenerImpl listener;
+
   public static class MockTaskAttemptListenerImpl
       extends TaskAttemptListenerImpl {
 
@@ -93,34 +155,24 @@ public class TestTaskAttemptListenerImpl {
       //Empty
     }
   }
-  
+
+  @After
+  public void after() throws IOException {
+    if (listener != null) {
+      listener.close();
+      listener = null;
+    }
+  }
+
   @Test  (timeout=5000)
   public void testGetTask() throws IOException {
-    AppContext appCtx = mock(AppContext.class);
-    JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
-    RMHeartbeatHandler rmHeartbeatHandler =
-        mock(RMHeartbeatHandler.class);
-    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    Dispatcher dispatcher = mock(Dispatcher.class);
-    @SuppressWarnings("unchecked")
-    EventHandler<Event> ea = mock(EventHandler.class);
-    when(dispatcher.getEventHandler()).thenReturn(ea);
-
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
-    policy.init(appCtx);
-    MockTaskAttemptListenerImpl listener = 
-      new MockTaskAttemptListenerImpl(appCtx, secret,
-          rmHeartbeatHandler, hbHandler, policy);
-    Configuration conf = new Configuration();
-    listener.init(conf);
-    listener.start();
-    JVMId id = new JVMId("foo",1, true, 1);
-    WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+    configureMocks();
+    startListener(false);
 
     // Verify ask before registration.
     //The JVM ID has not been registered yet so we should kill it.
     JvmContext context = new JvmContext();
+
     context.jvmId = id; 
     JvmTask result = listener.getTask(context);
     assertNotNull(result);
@@ -128,20 +180,18 @@ public class TestTaskAttemptListenerImpl {
 
     // Verify ask after registration but before launch. 
     // Don't kill, should be null.
-    TaskAttemptId attemptID = mock(TaskAttemptId.class);
-    Task task = mock(Task.class);
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
     result = listener.getTask(context);
     assertNull(result);
     // Unregister for more testing.
-    listener.unregister(attemptID, wid);
+    listener.unregister(attemptId, wid);
 
     // Verify ask after registration and launch
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
-    listener.registerLaunchedTask(attemptID, wid);
-    verify(hbHandler).register(attemptID);
+    listener.registerLaunchedTask(attemptId, wid);
+    verify(hbHandler).register(attemptId);
     result = listener.getTask(context);
     assertNotNull(result);
     assertFalse(result.shouldDie);
@@ -152,15 +202,13 @@ public class TestTaskAttemptListenerImpl {
     assertNotNull(result);
     assertTrue(result.shouldDie);
 
-    listener.unregister(attemptID, wid);
+    listener.unregister(attemptId, wid);
 
     // Verify after unregistration.
     result = listener.getTask(context);
     assertNotNull(result);
     assertTrue(result.shouldDie);
 
-    listener.stop();
-
     // test JVMID
     JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
     assertNotNull(jvmid);
@@ -206,20 +254,10 @@ public class TestTaskAttemptListenerImpl {
     when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
         TypeConverter.fromYarn(empty));
 
-    AppContext appCtx = mock(AppContext.class);
+    configureMocks();
     when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
-    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
-    RMHeartbeatHandler rmHeartbeatHandler =
-        mock(RMHeartbeatHandler.class);
-    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    Dispatcher dispatcher = mock(Dispatcher.class);
-    @SuppressWarnings("unchecked")
-    EventHandler<Event> ea = mock(EventHandler.class);
-    when(dispatcher.getEventHandler()).thenReturn(ea);
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
-    policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+    listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
@@ -262,26 +300,17 @@ public class TestTaskAttemptListenerImpl {
   public void testCommitWindow() throws IOException {
     SystemClock clock = SystemClock.getInstance();
 
+    configureMocks();
+
     org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
         mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
     when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
     Job mockJob = mock(Job.class);
     when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
-    AppContext appCtx = mock(AppContext.class);
     when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
     when(appCtx.getClock()).thenReturn(clock);
-    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
-    RMHeartbeatHandler rmHeartbeatHandler =
-        mock(RMHeartbeatHandler.class);
-    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    Dispatcher dispatcher = mock(Dispatcher.class);
-    @SuppressWarnings("unchecked")
-    EventHandler<Event> ea = mock(EventHandler.class);
-    when(dispatcher.getEventHandler()).thenReturn(ea);
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
-    policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+    listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
@@ -300,44 +329,29 @@ public class TestTaskAttemptListenerImpl {
     verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
 
     // verify commit allowed when RM heartbeat is recent
-    when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+    when(rmHeartbeatHandler.getLastHeartbeatTime())
+      .thenReturn(clock.getTime());
     canCommit = listener.canCommit(tid);
     assertTrue(canCommit);
     verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
-
-    listener.stop();
   }
 
   @Test
   public void testCheckpointIDTracking()
     throws IOException, InterruptedException{
-
     SystemClock clock = SystemClock.getInstance();
 
+    configureMocks();
+
     org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
         mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
     when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
     Job mockJob = mock(Job.class);
     when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
-
-    Dispatcher dispatcher = mock(Dispatcher.class);
-    @SuppressWarnings("unchecked")
-    EventHandler<Event> ea = mock(EventHandler.class);
-    when(dispatcher.getEventHandler()).thenReturn(ea);
-
-    RMHeartbeatHandler rmHeartbeatHandler =
-        mock(RMHeartbeatHandler.class);
-
-    AppContext appCtx = mock(AppContext.class);
     when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
     when(appCtx.getClock()).thenReturn(clock);
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
-    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
-    policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+    listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
@@ -387,42 +401,13 @@ public class TestTaskAttemptListenerImpl {
 
     //assert it worked
     assert outcid == incid;
-
-    listener.stop();
-
   }
 
-  @SuppressWarnings("rawtypes")
   @Test
   public void testStatusUpdateProgress()
       throws IOException, InterruptedException {
-    AppContext appCtx = mock(AppContext.class);
-    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
-    RMHeartbeatHandler rmHeartbeatHandler =
-        mock(RMHeartbeatHandler.class);
-    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    Dispatcher dispatcher = mock(Dispatcher.class);
-    @SuppressWarnings("unchecked")
-    EventHandler<Event> ea = mock(EventHandler.class);
-    when(dispatcher.getEventHandler()).thenReturn(ea);
-
-    when(appCtx.getEventHandler()).thenReturn(ea);
-    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
-    policy.init(appCtx);
-    MockTaskAttemptListenerImpl listener =
-      new MockTaskAttemptListenerImpl(appCtx, secret,
-          rmHeartbeatHandler, hbHandler, policy);
-    Configuration conf = new Configuration();
-    listener.init(conf);
-    listener.start();
-    JVMId id = new JVMId("foo",1, true, 1);
-    WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
-
-    TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
-    TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
-    Task task = mock(Task.class);
-    listener.registerPendingTask(task, wid);
-    listener.registerLaunchedTask(attemptId, wid);
+    configureMocks();
+    startListener(true);
     verify(hbHandler).register(attemptId);
 
     // make sure a ping doesn't report progress
@@ -437,6 +422,116 @@ public class TestTaskAttemptListenerImpl {
     feedback = listener.statusUpdate(attemptID, mockStatus);
     assertTrue(feedback.getTaskFound());
     verify(hbHandler).progressing(eq(attemptId));
-    listener.close();
+  }
+
+  @Test
+  public void testSingleStatusUpdate()
+      throws IOException, InterruptedException {
+    configureMocks();
+    startListener(true);
+
+    listener.statusUpdate(attemptID, firstReduceStatus);
+
+    verify(ea).handle(eventCaptor.capture());
+    TaskAttemptStatusUpdateEvent updateEvent =
+        (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
+
+    TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
+    assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
+    assertEquals(1, status.fetchFailedMaps.size());
+    assertEquals(Phase.SHUFFLE, status.phase);
+  }
+
+  @Test
+  public void testStatusUpdateEventCoalescing()
+      throws IOException, InterruptedException {
+    configureMocks();
+    startListener(true);
+
+    listener.statusUpdate(attemptID, firstReduceStatus);
+    listener.statusUpdate(attemptID, secondReduceStatus);
+
+    verify(ea).handle(any(Event.class));
+    ConcurrentMap<TaskAttemptId,
+        AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
+        listener.getAttemptIdToStatus();
+    TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
+
+    assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
+    assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
+    assertEquals(2, status.fetchFailedMaps.size());
+    assertEquals(Phase.SORT, status.phase);
+  }
+
+  @Test
+  public void testCoalescedStatusUpdatesCleared()
+      throws IOException, InterruptedException {
+    // First two events are coalesced, the third is not
+    configureMocks();
+    startListener(true);
+
+    listener.statusUpdate(attemptID, firstReduceStatus);
+    listener.statusUpdate(attemptID, secondReduceStatus);
+    ConcurrentMap<TaskAttemptId,
+        AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
+        listener.getAttemptIdToStatus();
+    attemptIdToStatus.get(attemptId).set(null);
+    listener.statusUpdate(attemptID, thirdReduceStatus);
+
+    verify(ea, times(2)).handle(eventCaptor.capture());
+    TaskAttemptStatusUpdateEvent updateEvent =
+        (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
+
+    TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
+    assertNull(status.fetchFailedMaps);
+    assertEquals(Phase.REDUCE, status.phase);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStatusUpdateFromUnregisteredTask()
+      throws IOException, InterruptedException{
+    configureMocks();
+    startListener(false);
+
+    listener.statusUpdate(attemptID, firstReduceStatus);
+  }
+
+  private void configureMocks() {
+    firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+        TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
+        new Counters());
+    firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
+
+    secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+        TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
+        new Counters());
+    secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
+
+    thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+        TaskStatus.State.RUNNING, "", "RUNNING", "",
+        TaskStatus.Phase.REDUCE, new Counters());
+
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    listener = new MockTaskAttemptListenerImpl(appCtx, secret,
+          rmHeartbeatHandler, hbHandler, policy);
+    id = new JVMId("foo", 1, true, 1);
+    wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+    attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
+    attemptId = TypeConverter.toYarn(attemptID);
+  }
+
+  private void startListener(boolean registerTask) {
+    Configuration conf = new Configuration();
+
+    listener.init(conf);
+    listener.start();
+
+    if (registerTask) {
+      listener.registerPendingTask(task, wid);
+      listener.registerLaunchedTask(attemptId, wid);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
index cb2a29e..67a8901 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
@@ -442,7 +443,7 @@ public class TestFetchFailure {
     status.stateString = "OK";
     status.taskState = attempt.getState();
     TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
-        status);
+        new AtomicReference<>(status));
     app.getContext().getEventHandler().handle(event);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
index 77f9a09..ca3c28c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Assert;
 
@@ -103,7 +104,8 @@ public class TestMRClientService {
     taskAttemptStatus.phase = Phase.MAP;
     // send the status update
     app.getContext().getEventHandler().handle(
-        new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
+        new TaskAttemptStatusUpdateEvent(attempt.getID(),
+            new AtomicReference<>(taskAttemptStatus)));
 
     
     //verify that all object are fully populated by invoking RPCs.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index e8003c0..de171c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
@@ -84,7 +85,8 @@ public class TestSpeculativeExecutionWithMRApp {
             createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
               TaskAttemptState.RUNNING);
         TaskAttemptStatusUpdateEvent event =
-            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+                new AtomicReference<>(status));
         appEventHandler.handle(event);
       }
     }
@@ -155,7 +157,8 @@ public class TestSpeculativeExecutionWithMRApp {
             createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
               TaskAttemptState.RUNNING);
         TaskAttemptStatusUpdateEvent event =
-            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+                new AtomicReference<>(status));
         appEventHandler.handle(event);
       }
     }
@@ -180,7 +183,8 @@ public class TestSpeculativeExecutionWithMRApp {
                 TaskAttemptState.RUNNING);
           speculatedTask = task.getValue();
           TaskAttemptStatusUpdateEvent event =
-              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+                  new AtomicReference<>(status));
           appEventHandler.handle(event);
         }
       }
@@ -195,7 +199,8 @@ public class TestSpeculativeExecutionWithMRApp {
               createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
                 TaskAttemptState.RUNNING);
           TaskAttemptStatusUpdateEvent event =
-              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+                  new AtomicReference<>(status));
           appEventHandler.handle(event);
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org