You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2016/09/29 15:51:54 UTC
hadoop git commit: MAPREDUCE-6771. RMContainerAllocator sends
container diagnostics event after corresponding completion event. Contributed
by Haibo Chen (cherry picked from commit
c52ad9ee86f5033caca02a7af6aeccfc5c87a99e)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 1c8e38818 -> db93a64ec
MAPREDUCE-6771. RMContainerAllocator sends container diagnostics event after corresponding completion event. Contributed by Haibo Chen
(cherry picked from commit c52ad9ee86f5033caca02a7af6aeccfc5c87a99e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db93a64e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db93a64e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db93a64e
Branch: refs/heads/branch-2.8
Commit: db93a64eca2db78a19dff8c1eeba6f76c1e55b90
Parents: 1c8e388
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 29 15:50:50 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 29 15:51:36 2016 +0000
----------------------------------------------------------------------
.../v2/app/rm/RMContainerAllocator.java | 47 ++++++++++++--------
.../v2/app/rm/TestRMContainerAllocator.java | 45 +++++++++++++++++++
2 files changed, 74 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db93a64e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 4e28ae9..df5f500 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -142,7 +142,7 @@ public class RMContainerAllocator extends RMContainerRequestor
new LinkedList<ContainerRequest>();
//holds information about the assigned containers to task attempts
- private final AssignedRequests assignedRequests = new AssignedRequests();
+ private final AssignedRequests assignedRequests;
//holds scheduled requests to be fulfilled by RM
private final ScheduledRequests scheduledRequests = new ScheduledRequests();
@@ -188,6 +188,11 @@ public class RMContainerAllocator extends RMContainerRequestor
super(clientService, context);
this.stopped = new AtomicBoolean(false);
this.clock = context.getClock();
+ this.assignedRequests = createAssignedRequests();
+ }
+
+ protected AssignedRequests createAssignedRequests() {
+ return new AssignedRequests();
}
@Override
@@ -797,27 +802,33 @@ public class RMContainerAllocator extends RMContainerRequestor
handleJobPriorityChange(response);
for (ContainerStatus cont : finishedContainers) {
- LOG.info("Received completed container " + cont.getContainerId());
- TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
- if (attemptID == null) {
- LOG.error("Container complete event for unknown container id "
- + cont.getContainerId());
- } else {
- pendingRelease.remove(cont.getContainerId());
- assignedRequests.remove(attemptID);
-
- // send the container completed event to Task attempt
- eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
-
- // Send the diagnostics
- String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
- eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
- diagnostics));
- }
+ processFinishedContainer(cont);
}
return newContainers;
}
+ @SuppressWarnings("unchecked")
+ @VisibleForTesting
+ void processFinishedContainer(ContainerStatus container) {
+ LOG.info("Received completed container " + container.getContainerId());
+ TaskAttemptId attemptID = assignedRequests.get(container.getContainerId());
+ if (attemptID == null) {
+ LOG.error("Container complete event for unknown container "
+ + container.getContainerId());
+ } else {
+ pendingRelease.remove(container.getContainerId());
+ assignedRequests.remove(attemptID);
+
+ // Send the diagnostics
+ String diagnostic = StringInterner.weakIntern(container.getDiagnostics());
+ eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+ diagnostic));
+
+ // send the container completed event to Task attempt
+ eventHandler.handle(createContainerFinishedEvent(container, attemptID));
+ }
+ }
+
private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size();
if (maxRunningMaps > 0 && numScheduledMaps > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db93a64e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e3999a4..e550971 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@@ -69,6 +71,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
@@ -142,6 +145,7 @@ import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
+import org.mockito.InOrder;
@SuppressWarnings("unchecked")
public class TestRMContainerAllocator {
@@ -3011,6 +3015,47 @@ public class TestRMContainerAllocator {
}
}
+ /**
+ * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
+ * right order while processing finished containers.
+ */
+ @Test
+ public void testHandlingFinishedContainers() {
+ EventHandler eventHandler = mock(EventHandler.class);
+
+ AppContext context = mock(MRAppMaster.RunningAppContext.class);
+ when(context.getClock()).thenReturn(new ControlledClock());
+ when(context.getClusterInfo()).thenReturn(
+ new ClusterInfo(Resource.newInstance(10240, 1)));
+ when(context.getEventHandler()).thenReturn(eventHandler);
+ RMContainerAllocator containerAllocator =
+ new RMContainerAllocatorForFinishedContainer(null, context);
+
+ ContainerStatus finishedContainer = ContainerStatus.newInstance(
+ mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
+ containerAllocator.processFinishedContainer(finishedContainer);
+
+ InOrder inOrder = inOrder(eventHandler);
+ inOrder.verify(eventHandler).handle(
+ isA(TaskAttemptDiagnosticsUpdateEvent.class));
+ inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
+ inOrder.verifyNoMoreInteractions();
+ }
+
+ private static class RMContainerAllocatorForFinishedContainer
+ extends RMContainerAllocator {
+ public RMContainerAllocatorForFinishedContainer(ClientService clientService,
+ AppContext context) {
+ super(clientService, context);
+ }
+ @Override
+ protected AssignedRequests createAssignedRequests() {
+ AssignedRequests assignedReqs = mock(AssignedRequests.class);
+ TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
+ when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
+ return assignedReqs;
+ }
+ }
@Test
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org