You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2017/12/02 01:17:43 UTC
hadoop git commit: Revert MAPREDUCE-5124 from 2.7.5.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 e650fcf25 -> bb44d8e83
Revert MAPREDUCE-5124 from 2.7.5.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb44d8e8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb44d8e8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb44d8e8
Branch: refs/heads/branch-2.7
Commit: bb44d8e8393268a4d2b5170e49fa060ad03605dc
Parents: e650fcf
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Fri Dec 1 17:14:52 2017 -0800
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Fri Dec 1 17:14:52 2017 -0800
----------------------------------------------------------------------
.../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +-----
.../job/event/TaskAttemptStatusUpdateEvent.java | 12 +-
.../v2/app/job/impl/TaskAttemptImpl.java | 19 +-
.../mapred/TestTaskAttemptListenerImpl.java | 248 +++----------------
.../mapreduce/v2/app/TestFetchFailure.java | 3 +-
.../mapreduce/v2/app/TestMRClientService.java | 4 +-
.../v2/TestSpeculativeExecutionWithMRApp.java | 13 +-
7 files changed, 63 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 53d758d..6627604 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,11 +22,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -58,8 +55,6 @@ import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This class is responsible for talking to the task umblical.
* It also converts all the old data structures
@@ -85,11 +80,6 @@ public class TaskAttemptListenerImpl extends CompositeService
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap
= new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
-
- private ConcurrentMap<TaskAttemptId,
- AtomicReference<TaskAttemptStatus>> attemptIdToStatus
- = new ConcurrentHashMap<>();
-
private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@@ -339,14 +329,6 @@ public class TaskAttemptListenerImpl extends CompositeService
TaskStatus taskStatus) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
-
- AtomicReference<TaskAttemptStatus> lastStatusRef =
- attemptIdToStatus.get(yarnAttemptID);
- if (lastStatusRef == null) {
- throw new IllegalStateException("Status update was called"
- + " with illegal TaskAttemptId: " + yarnAttemptID);
- }
-
taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus();
@@ -404,8 +386,9 @@ public class TaskAttemptListenerImpl extends CompositeService
// // isn't ever changed by the Task itself.
// taskStatus.getIncludeCounters();
- coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef);
-
+ context.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+ taskAttemptStatus));
return true;
}
@@ -486,9 +469,6 @@ public class TaskAttemptListenerImpl extends CompositeService
launchedJVMs.add(jvmId);
taskHeartbeatHandler.register(attemptID);
-
- attemptIdToStatus.put(attemptID,
- new AtomicReference<TaskAttemptStatus>());
}
@Override
@@ -510,8 +490,6 @@ public class TaskAttemptListenerImpl extends CompositeService
//unregister this attempt
taskHeartbeatHandler.unregister(attemptID);
-
- attemptIdToStatus.remove(attemptID);
}
@Override
@@ -520,47 +498,4 @@ public class TaskAttemptListenerImpl extends CompositeService
return ProtocolSignature.getProtocolSignature(this,
protocol, clientVersion, clientMethodsHash);
}
-
- private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
- TaskAttemptStatus taskAttemptStatus,
- AtomicReference<TaskAttemptStatus> lastStatusRef) {
- boolean asyncUpdatedNeeded = false;
- TaskAttemptStatus lastStatus = lastStatusRef.get();
-
- if (lastStatus == null) {
- lastStatusRef.set(taskAttemptStatus);
- asyncUpdatedNeeded = true;
- } else {
- List<TaskAttemptId> oldFetchFailedMaps =
- taskAttemptStatus.fetchFailedMaps;
-
- // merge fetchFailedMaps from the previous update
- if (lastStatus.fetchFailedMaps != null) {
- if (taskAttemptStatus.fetchFailedMaps == null) {
- taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
- } else {
- taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
- }
- }
-
- if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
- // update failed - async dispatcher has processed it in the meantime
- taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
- lastStatusRef.set(taskAttemptStatus);
- asyncUpdatedNeeded = true;
- }
- }
-
- if (asyncUpdatedNeeded) {
- context.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
- lastStatusRef));
- }
- }
-
- @VisibleForTesting
- ConcurrentMap<TaskAttemptId,
- AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
- return attemptIdToStatus;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
index cef4fd0..715f63d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@@ -27,16 +26,17 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
- private AtomicReference<TaskAttemptStatus> taskAttemptStatusRef;
+
+ private TaskAttemptStatus reportedTaskAttemptStatus;
public TaskAttemptStatusUpdateEvent(TaskAttemptId id,
- AtomicReference<TaskAttemptStatus> taskAttemptStatusRef) {
+ TaskAttemptStatus taskAttemptStatus) {
super(id, TaskAttemptEventType.TA_UPDATE);
- this.taskAttemptStatusRef = taskAttemptStatusRef;
+ this.reportedTaskAttemptStatus = taskAttemptStatus;
}
- public AtomicReference<TaskAttemptStatus> getTaskAttemptStatusRef() {
- return taskAttemptStatusRef;
+ public TaskAttemptStatus getReportedTaskAttemptStatus() {
+ return reportedTaskAttemptStatus;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 5a7e545..813010d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1663,7 +1662,6 @@ public abstract class TaskAttemptImpl implements
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
-
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
@@ -1959,20 +1957,15 @@ public abstract class TaskAttemptImpl implements
}
private static class StatusUpdater
- implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
- TaskAttemptStatusUpdateEvent statusEvent =
- ((TaskAttemptStatusUpdateEvent)event);
-
- AtomicReference<TaskAttemptStatus> taskAttemptStatusRef =
- statusEvent.getTaskAttemptStatusRef();
-
+ // Status update calls don't really change the state of the attempt.
TaskAttemptStatus newReportedStatus =
- taskAttemptStatusRef.getAndSet(null);
-
+ ((TaskAttemptStatusUpdateEvent) event)
+ .getReportedTaskAttemptStatus();
// Now switch the information in the reportedStatus
taskAttempt.reportedStatus = newReportedStatus;
taskAttempt.reportedStatus.taskState = taskAttempt.getState();
@@ -1981,10 +1974,12 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
+
taskAttempt.updateProgressSplits();
+
//if fetch failures are present, send the fetch failure event to job
//this only will happen in reduce attempt type
- if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
+ if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index f8a6a9e..d35d1e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -31,15 +31,14 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+
+import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -47,83 +46,15 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-/**
- * Tests the behavior of TaskAttemptListenerImpl.
- */
-@RunWith(MockitoJUnitRunner.class)
public class TestTaskAttemptListenerImpl {
- private static final String ATTEMPT1_ID =
- "attempt_123456789012_0001_m_000001_0";
- private static final String ATTEMPT2_ID =
- "attempt_123456789012_0001_m_000002_0";
-
- private static final TaskAttemptId TASKATTEMPTID1 =
- TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
- private static final TaskAttemptId TASKATTEMPTID2 =
- TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
-
- @Mock
- private AppContext appCtx;
-
- @Mock
- private JobTokenSecretManager secret;
-
- @Mock
- private RMHeartbeatHandler rmHeartbeatHandler;
-
- @Mock
- private TaskHeartbeatHandler hbHandler;
-
- @Mock
- private Dispatcher dispatcher;
-
- @Mock
- private Task task;
-
- @SuppressWarnings("rawtypes")
- @Mock
- private EventHandler<Event> ea;
-
- @SuppressWarnings("rawtypes")
- @Captor
- private ArgumentCaptor<Event> eventCaptor;
-
- private JVMId id;
- private WrappedJvmID wid;
- private TaskAttemptID attemptID;
- private TaskAttemptId attemptId;
- private ReduceTaskStatus firstReduceStatus;
- private ReduceTaskStatus secondReduceStatus;
- private ReduceTaskStatus thirdReduceStatus;
-
- private MockTaskAttemptListenerImpl listener;
-
- /**
- * Extension of the original TaskAttemptImpl
- * for testing purposes
- */
- public static class MockTaskAttemptListenerImpl
- extends TaskAttemptListenerImpl {
+ public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
@@ -154,24 +85,26 @@ public class TestTaskAttemptListenerImpl {
//Empty
}
}
-
- @After
- public void after() throws IOException {
- if (listener != null) {
- listener.close();
- listener = null;
- }
- }
-
+
@Test (timeout=5000)
public void testGetTask() throws IOException {
- configureMocks();
- startListener(false);
+ AppContext appCtx = mock(AppContext.class);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ MockTaskAttemptListenerImpl listener =
+ new MockTaskAttemptListenerImpl(appCtx, secret,
+ rmHeartbeatHandler, hbHandler);
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+ JVMId id = new JVMId("foo",1, true, 1);
+ WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
// Verify ask before registration.
//The JVM ID has not been registered yet so we should kill it.
JvmContext context = new JvmContext();
-
context.jvmId = id;
JvmTask result = listener.getTask(context);
assertNotNull(result);
@@ -179,18 +112,20 @@ public class TestTaskAttemptListenerImpl {
// Verify ask after registration but before launch.
// Don't kill, should be null.
+ TaskAttemptId attemptID = mock(TaskAttemptId.class);
+ Task task = mock(Task.class);
//Now put a task with the ID
listener.registerPendingTask(task, wid);
result = listener.getTask(context);
assertNull(result);
// Unregister for more testing.
- listener.unregister(attemptId, wid);
+ listener.unregister(attemptID, wid);
// Verify ask after registration and launch
//Now put a task with the ID
listener.registerPendingTask(task, wid);
- listener.registerLaunchedTask(attemptId, wid);
- verify(hbHandler).register(attemptId);
+ listener.registerLaunchedTask(attemptID, wid);
+ verify(hbHandler).register(attemptID);
result = listener.getTask(context);
assertNotNull(result);
assertFalse(result.shouldDie);
@@ -201,13 +136,15 @@ public class TestTaskAttemptListenerImpl {
assertNotNull(result);
assertTrue(result.shouldDie);
- listener.unregister(attemptId, wid);
+ listener.unregister(attemptID, wid);
// Verify after unregistration.
result = listener.getTask(context);
assertNotNull(result);
assertTrue(result.shouldDie);
+ listener.stop();
+
// test JVMID
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
assertNotNull(jvmid);
@@ -253,11 +190,14 @@ public class TestTaskAttemptListenerImpl {
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
TypeConverter.fromYarn(empty));
- configureMocks();
+ AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
-
- listener = new MockTaskAttemptListenerImpl(
- appCtx, secret, rmHeartbeatHandler, hbHandler) {
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptListenerImpl listener =
+ new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -298,18 +238,20 @@ public class TestTaskAttemptListenerImpl {
public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock();
- configureMocks();
-
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+ AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
-
- listener = new MockTaskAttemptListenerImpl(
- appCtx, secret, rmHeartbeatHandler, hbHandler) {
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptListenerImpl listener =
+ new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -327,119 +269,11 @@ public class TestTaskAttemptListenerImpl {
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent
- when(rmHeartbeatHandler.getLastHeartbeatTime())
- .thenReturn(clock.getTime());
+ when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
canCommit = listener.canCommit(tid);
assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
- }
-
- @Test
- public void testSingleStatusUpdate()
- throws IOException, InterruptedException {
- configureMocks();
- startListener(true);
- listener.statusUpdate(attemptID, firstReduceStatus);
-
- verify(ea).handle(eventCaptor.capture());
- TaskAttemptStatusUpdateEvent updateEvent =
- (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
-
- TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
- assertEquals(1, status.fetchFailedMaps.size());
- assertEquals(Phase.SHUFFLE, status.phase);
- }
-
- @Test
- public void testStatusUpdateEventCoalescing()
- throws IOException, InterruptedException {
- configureMocks();
- startListener(true);
-
- listener.statusUpdate(attemptID, firstReduceStatus);
- listener.statusUpdate(attemptID, secondReduceStatus);
-
- verify(ea).handle(any(Event.class));
- ConcurrentMap<TaskAttemptId,
- AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
- listener.getAttemptIdToStatus();
- TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
-
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
- assertEquals(2, status.fetchFailedMaps.size());
- assertEquals(Phase.SORT, status.phase);
- }
-
- @Test
- public void testCoalescedStatusUpdatesCleared()
- throws IOException, InterruptedException {
- // First two events are coalesced, the third is not
- configureMocks();
- startListener(true);
-
- listener.statusUpdate(attemptID, firstReduceStatus);
- listener.statusUpdate(attemptID, secondReduceStatus);
- ConcurrentMap<TaskAttemptId,
- AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
- listener.getAttemptIdToStatus();
- attemptIdToStatus.get(attemptId).set(null);
- listener.statusUpdate(attemptID, thirdReduceStatus);
-
- verify(ea, times(2)).handle(eventCaptor.capture());
- TaskAttemptStatusUpdateEvent updateEvent =
- (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
-
- TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
- assertNull(status.fetchFailedMaps);
- assertEquals(Phase.REDUCE, status.phase);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testStatusUpdateFromUnregisteredTask()
- throws IOException, InterruptedException{
- configureMocks();
- startListener(false);
-
- listener.statusUpdate(attemptID, firstReduceStatus);
- }
-
- private void configureMocks() {
- firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
- TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
- new Counters());
- firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
-
- secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
- TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
- new Counters());
- secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
-
- thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
- TaskStatus.State.RUNNING, "", "RUNNING", "",
- TaskStatus.Phase.REDUCE, new Counters());
-
- when(dispatcher.getEventHandler()).thenReturn(ea);
- when(appCtx.getEventHandler()).thenReturn(ea);
- listener = new MockTaskAttemptListenerImpl(appCtx, secret,
- rmHeartbeatHandler, hbHandler);
- id = new JVMId("foo", 1, true, 1);
- wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
- attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
- attemptId = TypeConverter.toYarn(attemptID);
- }
-
- private void startListener(boolean registerTask) {
- Configuration conf = new Configuration();
-
- listener.init(conf);
- listener.start();
-
- if (registerTask) {
- listener.registerPendingTask(task, wid);
- listener.registerLaunchedTask(attemptId, wid);
- }
+ listener.stop();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
index b500712..4e4e2e7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -426,7 +425,7 @@ public class TestFetchFailure {
status.stateString = "OK";
status.taskState = attempt.getState();
TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
- new AtomicReference<>(status));
+ status);
app.getContext().getEventHandler().handle(event);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
index ca3c28c..77f9a09 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
@@ -104,8 +103,7 @@ public class TestMRClientService {
taskAttemptStatus.phase = Phase.MAP;
// send the status update
app.getContext().getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(attempt.getID(),
- new AtomicReference<>(taskAttemptStatus)));
+ new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
//verify that all object are fully populated by invoking RPCs.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index 98bd03e..d2edd19 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -85,8 +84,7 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
appEventHandler.handle(event);
}
}
@@ -157,8 +155,7 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
appEventHandler.handle(event);
}
}
@@ -183,8 +180,7 @@ public class TestSpeculativeExecutionWithMRApp {
TaskAttemptState.RUNNING);
speculatedTask = task.getValue();
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
appEventHandler.handle(event);
}
}
@@ -199,8 +195,7 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event =
- new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
- new AtomicReference<>(status));
+ new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
appEventHandler.handle(event);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org