You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/03/18 20:31:44 UTC
hadoop git commit: MAPREDUCE-6277. Job can post multiple history
files if attempt loses connection to the RM. Contributed by Chang Li (cherry
picked from commit 30da99cbaf36aeef38a858251ce8ffa5eb657b38)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 a08b1b570 -> 24d8c6f35
MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li
(cherry picked from commit 30da99cbaf36aeef38a858251ce8ffa5eb657b38)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24d8c6f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24d8c6f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24d8c6f3
Branch: refs/heads/branch-2
Commit: 24d8c6f35506bac3ad1e8129f59df6b871b52a7d
Parents: a08b1b5
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Mar 18 19:29:56 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Mar 18 19:31:32 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/rm/RMContainerAllocator.java | 2 +-
.../v2/app/rm/TestRMContainerAllocator.java | 61 ++++++++++++++++++++
3 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5cd974c..8906809 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -210,6 +210,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa)
+ MAPREDUCE-6277. Job can post multiple history files if attempt loses
+ connection to the RM (Chang Li via jlowe)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 8d35d79..99d26a2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -698,7 +698,7 @@ public class RMContainerAllocator extends RMContainerRequestor
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
- JobEventType.INTERNAL_ERROR));
+ JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Could not contact RM after " +
retryInterval + " milliseconds.");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index f290037..c58cdbd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -1608,6 +1610,7 @@ public class TestRMContainerAllocator {
= new ArrayList<TaskAttemptKillEvent>();
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
= new ArrayList<JobUpdatedNodesEvent>();
+ static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
private MyResourceManager rm;
private boolean isUnregistered = false;
private AllocateResponse allocateResponse;
@@ -1630,6 +1633,8 @@ public class TestRMContainerAllocator {
taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
} else if (event instanceof JobUpdatedNodesEvent) {
jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
+ } else if (event instanceof JobEvent) {
+ jobEvents.add((JobEvent)event);
}
}
});
@@ -1782,6 +1787,18 @@ public class TestRMContainerAllocator {
}
}
+ private static class MyContainerAllocator2 extends MyContainerAllocator {
+ public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
+ ApplicationAttemptId appAttemptId, Job job) {
+ super(rm, conf, appAttemptId, job);
+ }
+ @Override
+ protected AllocateResponse makeRemoteRequest() throws IOException,
+ YarnException {
+ throw new YarnRuntimeException("for testing");
+ }
+ }
+
@Test
public void testReduceScheduling() throws Exception {
int totalMaps = 10;
@@ -2307,6 +2324,50 @@ public class TestRMContainerAllocator {
}
+ @Test
+ public void testRMUnavailable()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(
+ MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
+ MyResourceManager rm1 = new MyResourceManager(conf);
+ rm1.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
+ RMApp app = rm1.submitApp(1024);
+ dispatcher.await();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm1.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ MyContainerAllocator2 allocator =
+ new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob);
+ allocator.jobEvents.clear();
+ try {
+ allocator.schedule();
+ Assert.fail("Should Have Exception");
+ } catch (YarnRuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
+ }
+ dispatcher.await();
+ Assert.assertEquals("Should Have 1 Job Event", 1,
+ allocator.jobEvents.size());
+ JobEvent event = allocator.jobEvents.get(0);
+ Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
+ }
+
@Test(timeout=60000)
public void testAMRMTokenUpdate() throws Exception {
LOG.info("Running testAMRMTokenUpdate");