You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2017/12/01 20:09:16 UTC
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task
events. Contributed by Peter Bacsko
Repository: hadoop
Updated Branches:
refs/heads/trunk 0faf50624 -> 21d362735
MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273
Branch: refs/heads/trunk
Commit: 21d36273551fa45c4130e5523b6724358cf34b1e
Parents: 0faf506
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 1 14:04:25 2017 -0600
----------------------------------------------------------------------
.../hadoop/mapred/TaskAttemptListenerImpl.java | 69 +++-
.../job/event/TaskAttemptStatusUpdateEvent.java | 12 +-
.../v2/app/job/impl/TaskAttemptImpl.java | 20 +-
.../mapred/TestTaskAttemptListenerImpl.java | 315 ++++++++++++-------
.../mapreduce/v2/app/TestFetchFailure.java | 3 +-
.../mapreduce/v2/app/TestMRClientService.java | 4 +-
.../v2/TestSpeculativeExecutionWithMRApp.java | 13 +-
7 files changed, 302 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This class is responsible for talking to the task umblical.
* It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
* This class HAS to be in this package to access package private
* methods/classes.
*/
-@SuppressWarnings({"unchecked"})
public class TaskAttemptListenerImpl extends CompositeService
implements TaskUmbilicalProtocol, TaskAttemptListener {
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap
= new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
+
+ private ConcurrentMap<TaskAttemptId,
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus
+ = new ConcurrentHashMap<>();
+
private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
+ AtomicReference<TaskAttemptStatus> lastStatusRef =
+ attemptIdToStatus.get(yarnAttemptID);
+ if (lastStatusRef == null) {
+ throw new IllegalStateException("Status update was called"
+ + " with illegal TaskAttemptId: " + yarnAttemptID);
+ }
+
AMFeedback feedback = new AMFeedback();
feedback.setTaskFound(true);
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService
// // isn't ever changed by the Task itself.
// taskStatus.getIncludeCounters();
- context.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
- taskAttemptStatus));
+ coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef);
+
return feedback;
}
@@ -520,6 +535,8 @@ public class TaskAttemptListenerImpl extends CompositeService
launchedJVMs.add(jvmId);
taskHeartbeatHandler.register(attemptID);
+
+ attemptIdToStatus.put(attemptID, new AtomicReference<>());
}
@Override
@@ -541,6 +558,8 @@ public class TaskAttemptListenerImpl extends CompositeService
//unregister this attempt
taskHeartbeatHandler.unregister(attemptID);
+
+ attemptIdToStatus.remove(attemptID);
}
@Override
@@ -563,4 +582,46 @@ public class TaskAttemptListenerImpl extends CompositeService
preemptionPolicy.setCheckpointID(tid, cid);
}
+ private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
+ TaskAttemptStatus taskAttemptStatus,
+ AtomicReference<TaskAttemptStatus> lastStatusRef) {
+ boolean asyncUpdatedNeeded = false;
+ TaskAttemptStatus lastStatus = lastStatusRef.get();
+
+ if (lastStatus == null) {
+ lastStatusRef.set(taskAttemptStatus);
+ asyncUpdatedNeeded = true;
+ } else {
+ List<TaskAttemptId> oldFetchFailedMaps =
+ taskAttemptStatus.fetchFailedMaps;
+
+ // merge fetchFailedMaps from the previous update
+ if (lastStatus.fetchFailedMaps != null) {
+ if (taskAttemptStatus.fetchFailedMaps == null) {
+ taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
+ } else {
+ taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+ }
+ }
+
+ if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
+ // update failed - async dispatcher has processed it in the meantime
+ taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
+ lastStatusRef.set(taskAttemptStatus);
+ asyncUpdatedNeeded = true;
+ }
+ }
+
+ if (asyncUpdatedNeeded) {
+ context.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+ lastStatusRef));
+ }
+ }
+
+ @VisibleForTesting
+ ConcurrentMap<TaskAttemptId,
+ AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
+ return attemptIdToStatus;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
index 715f63d..cef4fd0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@@ -26,17 +27,16 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
-
- private TaskAttemptStatus reportedTaskAttemptStatus;
+ private AtomicReference<TaskAttemptStatus> taskAttemptStatusRef;
public TaskAttemptStatusUpdateEvent(TaskAttemptId id,
- TaskAttemptStatus taskAttemptStatus) {
+ AtomicReference<TaskAttemptStatus> taskAttemptStatusRef) {
super(id, TaskAttemptEventType.TA_UPDATE);
- this.reportedTaskAttemptStatus = taskAttemptStatus;
+ this.taskAttemptStatusRef = taskAttemptStatusRef;
}
- public TaskAttemptStatus getReportedTaskAttemptStatus() {
- return reportedTaskAttemptStatus;
+ public AtomicReference<TaskAttemptStatus> getTaskAttemptStatusRef() {
+ return taskAttemptStatusRef;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 90e0d21..431128b 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1780,7 +1781,6 @@ public abstract class TaskAttemptImpl implements
taskAttempt.updateProgressSplits();
}
-
static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled;
@@ -1965,6 +1965,7 @@ public abstract class TaskAttemptImpl implements
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
+
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
@@ -2430,15 +2431,20 @@ public abstract class TaskAttemptImpl implements
}
private static class StatusUpdater
- implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
- // Status update calls don't really change the state of the attempt.
+ TaskAttemptStatusUpdateEvent statusEvent =
+ ((TaskAttemptStatusUpdateEvent)event);
+
+ AtomicReference<TaskAttemptStatus> taskAttemptStatusRef =
+ statusEvent.getTaskAttemptStatusRef();
+
TaskAttemptStatus newReportedStatus =
- ((TaskAttemptStatusUpdateEvent) event)
- .getReportedTaskAttemptStatus();
+ taskAttemptStatusRef.getAndSet(null);
+
// Now switch the information in the reportedStatus
taskAttempt.reportedStatus = newReportedStatus;
taskAttempt.reportedStatus.taskState = taskAttempt.getState();
@@ -2447,12 +2453,10 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-
taskAttempt.updateProgressSplits();
-
//if fetch failures are present, send the fetch failure event to job
//this only will happen in reduce attempt type
- if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
+ if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
String hostname = taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.container.getNodeId().getHost();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index fa8418a..4ff6fb2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -42,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -52,12 +57,69 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock;
-
+import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+/**
+ * Tests the behavior of TaskAttemptListenerImpl.
+ */
+@RunWith(MockitoJUnitRunner.class)
public class TestTaskAttemptListenerImpl {
+ private static final String ATTEMPT1_ID =
+ "attempt_123456789012_0001_m_000001_0";
+ private static final String ATTEMPT2_ID =
+ "attempt_123456789012_0001_m_000002_0";
+
+ private static final TaskAttemptId TASKATTEMPTID1 =
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
+ private static final TaskAttemptId TASKATTEMPTID2 =
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
+
+ @Mock
+ private AppContext appCtx;
+
+ @Mock
+ private JobTokenSecretManager secret;
+
+ @Mock
+ private RMHeartbeatHandler rmHeartbeatHandler;
+
+ @Mock
+ private TaskHeartbeatHandler hbHandler;
+
+ @Mock
+ private Dispatcher dispatcher;
+
+ @Mock
+ private Task task;
+
+ @SuppressWarnings("rawtypes")
+ @Mock
+ private EventHandler<Event> ea;
+
+ @SuppressWarnings("rawtypes")
+ @Captor
+ private ArgumentCaptor<Event> eventCaptor;
+
+ private CheckpointAMPreemptionPolicy policy;
+ private JVMId id;
+ private WrappedJvmID wid;
+ private TaskAttemptID attemptID;
+ private TaskAttemptId attemptId;
+ private ReduceTaskStatus firstReduceStatus;
+ private ReduceTaskStatus secondReduceStatus;
+ private ReduceTaskStatus thirdReduceStatus;
+
+ private MockTaskAttemptListenerImpl listener;
+
public static class MockTaskAttemptListenerImpl
extends TaskAttemptListenerImpl {
@@ -93,34 +155,24 @@ public class TestTaskAttemptListenerImpl {
//Empty
}
}
-
+
+ @After
+ public void after() throws IOException {
+ if (listener != null) {
+ listener.close();
+ listener = null;
+ }
+ }
+
@Test (timeout=5000)
public void testGetTask() throws IOException {
- AppContext appCtx = mock(AppContext.class);
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
- RMHeartbeatHandler rmHeartbeatHandler =
- mock(RMHeartbeatHandler.class);
- TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- Dispatcher dispatcher = mock(Dispatcher.class);
- @SuppressWarnings("unchecked")
- EventHandler<Event> ea = mock(EventHandler.class);
- when(dispatcher.getEventHandler()).thenReturn(ea);
-
- when(appCtx.getEventHandler()).thenReturn(ea);
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
- policy.init(appCtx);
- MockTaskAttemptListenerImpl listener =
- new MockTaskAttemptListenerImpl(appCtx, secret,
- rmHeartbeatHandler, hbHandler, policy);
- Configuration conf = new Configuration();
- listener.init(conf);
- listener.start();
- JVMId id = new JVMId("foo",1, true, 1);
- WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+ configureMocks();
+ startListener(false);
// Verify ask before registration.
//The JVM ID has not been registered yet so we should kill it.
JvmContext context = new JvmContext();
+
context.jvmId = id;
JvmTask result = listener.getTask(context);
assertNotNull(result);
@@ -128,20 +180,18 @@ public class TestTaskAttemptListenerImpl {
// Verify ask after registration but before launch.
// Don't kill, should be null.
- TaskAttemptId attemptID = mock(TaskAttemptId.class);
- Task task = mock(Task.class);
//Now put a task with the ID
listener.registerPendingTask(task, wid);
result = listener.getTask(context);
assertNull(result);
// Unregister for more testing.
- listener.unregister(attemptID, wid);
+ listener.unregister(attemptId, wid);
// Verify ask after registration and launch
//Now put a task with the ID
listener.registerPendingTask(task, wid);
- listener.registerLaunchedTask(attemptID, wid);
- verify(hbHandler).register(attemptID);
+ listener.registerLaunchedTask(attemptId, wid);
+ verify(hbHandler).register(attemptId);
result = listener.getTask(context);
assertNotNull(result);
assertFalse(result.shouldDie);
@@ -152,15 +202,13 @@ public class TestTaskAttemptListenerImpl {
assertNotNull(result);
assertTrue(result.shouldDie);
- listener.unregister(attemptID, wid);
+ listener.unregister(attemptId, wid);
// Verify after unregistration.
result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
- listener.stop();
-
// test JVMID
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
assertNotNull(jvmid);
@@ -206,20 +254,10 @@ public class TestTaskAttemptListenerImpl {
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
TypeConverter.fromYarn(empty));
- AppContext appCtx = mock(AppContext.class);
+ configureMocks();
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
- RMHeartbeatHandler rmHeartbeatHandler =
- mock(RMHeartbeatHandler.class);
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- Dispatcher dispatcher = mock(Dispatcher.class);
- @SuppressWarnings("unchecked")
- EventHandler<Event> ea = mock(EventHandler.class);
- when(dispatcher.getEventHandler()).thenReturn(ea);
- when(appCtx.getEventHandler()).thenReturn(ea);
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
- policy.init(appCtx);
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+ listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
@@ -262,26 +300,17 @@ public class TestTaskAttemptListenerImpl {
public void testCommitWindow() throws IOException {
SystemClock clock = SystemClock.getInstance();
+ configureMocks();
+
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
- AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
- RMHeartbeatHandler rmHeartbeatHandler =
- mock(RMHeartbeatHandler.class);
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- Dispatcher dispatcher = mock(Dispatcher.class);
- @SuppressWarnings("unchecked")
- EventHandler<Event> ea = mock(EventHandler.class);
- when(dispatcher.getEventHandler()).thenReturn(ea);
- when(appCtx.getEventHandler()).thenReturn(ea);
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
- policy.init(appCtx);
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+ listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
@@ -300,44 +329,29 @@ public class TestTaskAttemptListenerImpl {
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent
- when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+ when(rmHeartbeatHandler.getLastHeartbeatTime())
+ .thenReturn(clock.getTime());
canCommit = listener.canCommit(tid);
assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
-
- listener.stop();
}
@Test
public void testCheckpointIDTracking()
throws IOException, InterruptedException{
-
SystemClock clock = SystemClock.getInstance();
+ configureMocks();
+
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
-
- Dispatcher dispatcher = mock(Dispatcher.class);
- @SuppressWarnings("unchecked")
- EventHandler<Event> ea = mock(EventHandler.class);
- when(dispatcher.getEventHandler()).thenReturn(ea);
-
- RMHeartbeatHandler rmHeartbeatHandler =
- mock(RMHeartbeatHandler.class);
-
- AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
- when(appCtx.getEventHandler()).thenReturn(ea);
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- when(appCtx.getEventHandler()).thenReturn(ea);
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
- policy.init(appCtx);
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
+
+ listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
@@ -387,42 +401,13 @@ public class TestTaskAttemptListenerImpl {
//assert it worked
assert outcid == incid;
-
- listener.stop();
-
}
- @SuppressWarnings("rawtypes")
@Test
public void testStatusUpdateProgress()
throws IOException, InterruptedException {
- AppContext appCtx = mock(AppContext.class);
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
- RMHeartbeatHandler rmHeartbeatHandler =
- mock(RMHeartbeatHandler.class);
- TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- Dispatcher dispatcher = mock(Dispatcher.class);
- @SuppressWarnings("unchecked")
- EventHandler<Event> ea = mock(EventHandler.class);
- when(dispatcher.getEventHandler()).thenReturn(ea);
-
- when(appCtx.getEventHandler()).thenReturn(ea);
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
- policy.init(appCtx);
- MockTaskAttemptListenerImpl listener =
- new MockTaskAttemptListenerImpl(appCtx, secret,
- rmHeartbeatHandler, hbHandler, policy);
- Configuration conf = new Configuration();
- listener.init(conf);
- listener.start();
- JVMId id = new JVMId("foo",1, true, 1);
- WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
-
- TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
- TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
- Task task = mock(Task.class);
- listener.registerPendingTask(task, wid);
- listener.registerLaunchedTask(attemptId, wid);
+ configureMocks();
+ startListener(true);
verify(hbHandler).register(attemptId);
// make sure a ping doesn't report progress
@@ -437,6 +422,116 @@ public class TestTaskAttemptListenerImpl {
feedback = listener.statusUpdate(attemptID, mockStatus);
assertTrue(feedback.getTaskFound());
verify(hbHandler).progressing(eq(attemptId));
- listener.close();
+ }
+
+ @Test
+ public void testSingleStatusUpdate()
+ throws IOException, InterruptedException {
+ configureMocks();
+ startListener(true);
+
+ listener.statusUpdate(attemptID, firstReduceStatus);
+
+ verify(ea).handle(eventCaptor.capture());
+ TaskAttemptStatusUpdateEvent updateEvent =
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
+
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
+ assertEquals(1, status.fetchFailedMaps.size());
+ assertEquals(Phase.SHUFFLE, status.phase);
+ }
+
+ @Test
+ public void testStatusUpdateEventCoalescing()
+ throws IOException, InterruptedException {
+ configureMocks();
+ startListener(true);
+
+ listener.statusUpdate(attemptID, firstReduceStatus);
+ listener.statusUpdate(attemptID, secondReduceStatus);
+
+ verify(ea).handle(any(Event.class));
+ ConcurrentMap<TaskAttemptId,
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
+ listener.getAttemptIdToStatus();
+ TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
+
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
+ assertEquals(2, status.fetchFailedMaps.size());
+ assertEquals(Phase.SORT, status.phase);
+ }
+
+ @Test
+ public void testCoalescedStatusUpdatesCleared()
+ throws IOException, InterruptedException {
+ // First two events are coalesced, the third is not
+ configureMocks();
+ startListener(true);
+
+ listener.statusUpdate(attemptID, firstReduceStatus);
+ listener.statusUpdate(attemptID, secondReduceStatus);
+ ConcurrentMap<TaskAttemptId,
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
+ listener.getAttemptIdToStatus();
+ attemptIdToStatus.get(attemptId).set(null);
+ listener.statusUpdate(attemptID, thirdReduceStatus);
+
+ verify(ea, times(2)).handle(eventCaptor.capture());
+ TaskAttemptStatusUpdateEvent updateEvent =
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
+
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
+ assertNull(status.fetchFailedMaps);
+ assertEquals(Phase.REDUCE, status.phase);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStatusUpdateFromUnregisteredTask()
+ throws IOException, InterruptedException{
+ configureMocks();
+ startListener(false);
+
+ listener.statusUpdate(attemptID, firstReduceStatus);
+ }
+
+ private void configureMocks() {
+ firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
+ new Counters());
+ firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
+
+ secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
+ new Counters());
+ secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
+
+ thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
+ TaskStatus.State.RUNNING, "", "RUNNING", "",
+ TaskStatus.Phase.REDUCE, new Counters());
+
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ listener = new MockTaskAttemptListenerImpl(appCtx, secret,
+ rmHeartbeatHandler, hbHandler, policy);
+ id = new JVMId("foo", 1, true, 1);
+ wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+ attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
+ attemptId = TypeConverter.toYarn(attemptID);
+ }
+
+ private void startListener(boolean registerTask) {
+ Configuration conf = new Configuration();
+
+ listener.init(conf);
+ listener.start();
+
+ if (registerTask) {
+ listener.registerPendingTask(task, wid);
+ listener.registerLaunchedTask(attemptId, wid);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
index cb2a29e..67a8901 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
@@ -442,7 +443,7 @@ public class TestFetchFailure {
status.stateString = "OK";
status.taskState = attempt.getState();
TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
- status);
+ new AtomicReference<>(status));
app.getContext().getEventHandler().handle(event);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
index 77f9a09..ca3c28c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
@@ -103,7 +104,8 @@ public class TestMRClientService {
taskAttemptStatus.phase = Phase.MAP;
// send the status update
app.getContext().getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
+ new TaskAttemptStatusUpdateEvent(attempt.getID(),
+ new AtomicReference<>(taskAttemptStatus)));
//verify that all object are fully populated by invoking RPCs.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index e8003c0..de171c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -84,7 +85,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+ new AtomicReference<>(status));
appEventHandler.handle(event);
}
}
@@ -155,7 +157,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+ new AtomicReference<>(status));
appEventHandler.handle(event);
}
}
@@ -180,7 +183,8 @@ public class TestSpeculativeExecutionWithMRApp {
TaskAttemptState.RUNNING);
speculatedTask = task.getValue();
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+ new AtomicReference<>(status));
appEventHandler.handle(event);
}
}
@@ -195,7 +199,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
+ new AtomicReference<>(status));
appEventHandler.handle(event);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org